This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c565a83df1 Enforce removeSegment flow with
_enableDeletedKeysCompactionConsistency (#13914)
c565a83df1 is described below
commit c565a83df1c683510a45edb7755a72702bee6537
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Tue Sep 17 13:13:24 2024 +0530
Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency
(#13914)
* Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency
* fix ut and move comparisonTies resolution for full upsert inside subclass
---
...nUpsertMetadataManagerForConsistentDeletes.java | 55 +++-
...ertMetadataManagerForConsistentDeletesTest.java | 297 +++++++++++----------
2 files changed, 206 insertions(+), 146 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 4221809039..8507d00cc9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -35,6 +37,7 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -46,7 +49,8 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
/**
* Implementation of {@link PartitionUpsertMetadataManager} that is backed by
a {@link ConcurrentHashMap} and ensures
- * consistent deletions. This should be used when the table is configured with
'enableConsistentDeletes' set to true.
+ * consistent deletions. This should be used when the table is configured with
'enableDeletedKeysCompactionConsistency'
+ * set to true.
*
* Consistent deletion ensures that when deletedKeysTTL is enabled with
UpsertCompaction, the key metadata is
* removed from the HashMap only after all other records in the old segments
are compacted. This guarantees
@@ -81,6 +85,12 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
protected void doAddOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap
validDocIdsForOldSegment) {
+ if (_partialUpsertHandler == null) {
+ // for full upsert, we are de-duping primary key once here to make sure
that we are not adding
+ // primary-key multiple times and subtracting just once in removeSegment.
+ // for partial-upsert, we call this method in base class.
+ recordInfoIterator = resolveComparisonTies(recordInfoIterator,
_hashFunction);
+ }
String segmentName = segment.getSegmentName();
segment.enableUpsert(this, validDocIds, queryableDocIds);
@@ -209,6 +219,49 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
+ @Override
+ public void replaceSegment(ImmutableSegment segment, @Nullable
ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable
Iterator<RecordInfo> recordInfoIterator,
+ IndexSegment oldSegment) {
+ String segmentName = segment.getSegmentName();
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType,
segmentName);
+ segmentLock.lock();
+ try {
+ MutableRoaringBitmap validDocIdsForOldSegment =
+ oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (recordInfoIterator != null) {
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {},
table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (queryableDocIds == null && _deleteRecordColumn != null) {
+ queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
+ }
+ if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ // For partial-upsert table, because we do not restore the original
record location when removing the primary
+ // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
+ // consuming segment is replaced by a committed segment that is
consumed from a different server with
+ // different records (some stream consumer cannot guarantee consuming
the messages in the same order).
+ _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
+ + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+ numKeysNotReplaced);
+ }
+ // we want to always remove a segment in case of
enableDeletedKeysCompactionConsistency = true
+ // this is to account for the removal of primary-key in the
to-be-removed segment and reduce
+ // distinctSegmentCount by 1
+ doRemoveSegment(oldSegment);
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
@Override
protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 4a6ab831cb..f448bd1976 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -73,9 +73,9 @@ import static org.testng.Assert.*;
/**
* This class tries to replicate the behaviour for {@code
ConcurrentMapPartitionUpsertMetadataManagerTest} assuming
- * that _enableConsistentDeletes is enabled, and accordingly we set all the
params in {@code setUpContextBuilder}.
- * We have removed preload and metadataTTL unit-tests for now as we don't
allow them along with
- * _enableConsistentDeletes.
+ * that _enableDeletedKeysCompactionConsistency is enabled, and accordingly we
set all the params in
+ * {@code setUpContextBuilder}. We have removed preload and metadataTTL
unit-tests for now as we don't allow
+ * them along with _enableDeletedKeysCompactionConsistency.
*/
public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest {
private static final String RAW_TABLE_NAME = "testTable";
@@ -90,7 +90,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
- List<PrimaryKey> primaryKeys) {
+ @Nullable List<PrimaryKey> primaryKeys) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
@@ -105,6 +105,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ if (primaryKeys != null) {
+ when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
+ }
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
@@ -129,13 +132,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
private static ImmutableSegmentImpl
mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
- List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata,
MutableRoaringBitmap snapshot) {
+ @Nullable List<PrimaryKey> primaryKeys, SegmentMetadataImpl
segmentMetadata, MutableRoaringBitmap snapshot) {
ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber,
validDocIds, queryableDocIds, primaryKeys);
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
@@ -171,13 +175,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
private static void checkRecordLocation(
Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap,
- int keyValue, IndexSegment segment, int docId, int comparisonValue,
HashFunction hashFunction) {
+ int keyValue, IndexSegment segment, int docId, int comparisonValue, int
distinctSegmentCount,
+ HashFunction hashFunction) {
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation
recordLocation =
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue),
hashFunction));
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
- assertEquals(((Integer) recordLocation.getComparisonValue()),
comparisonValue);
+ assertEquals(((Integer) recordLocation.getComparisonValue()).intValue(),
comparisonValue);
+ assertEquals(recordLocation.getDistinctSegmentCount(),
distinctSegmentCount);
}
@BeforeClass
@@ -321,14 +327,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
int[] docIds1 = new int[]{2, 4, 5};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
validDocIdsSnapshot1.add(docIds1);
- recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys,
timestamps, null);
+ recordInfoList1 = getRecordInfoList(primaryKeys.length, primaryKeys,
timestamps, null);
upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
trackedSegments.add(segment1);
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
- checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
// Add the second segment
@@ -348,17 +354,17 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1 snapshot: 1 -> {4, 120}
MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
validDocIdsSnapshot2.add(0, 2, 3);
- recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys,
timestamps, null);
+ recordInfoList2 = getRecordInfoList(primaryKeys.length, primaryKeys,
timestamps, null);
upsertMetadataManager.addSegment(segment2, validDocIds2, null,
recordInfoList2.iterator());
trackedSegments.add(segment2);
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
@@ -368,10 +374,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
@@ -379,7 +385,6 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
ThreadSafeMutableRoaringBitmap newValidDocIds1 = new
ThreadSafeMutableRoaringBitmap();
SegmentMetadataImpl newSegmentMetadata1 = mock(SegmentMetadataImpl.class);
when(newSegmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
- when(newSegmentMetadata1.getTotalDocs()).thenReturn(primaryKeys1.size());
ImmutableSegmentImpl newSegment1 =
mockImmutableSegmentWithSegmentMetadata(1, newValidDocIds1, null,
primaryKeys1, newSegmentMetadata1, null);
upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null,
recordInfoList1.iterator(), segment1);
@@ -389,23 +394,24 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
// Remove the original segment1
+ // this will be a No-OP. Above replace took care of removing segment1
upsertMetadataManager.removeSegment(segment1);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
@@ -415,10 +421,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
@@ -427,7 +433,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 1);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1,
hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -439,7 +445,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
upsertMetadataManager.removeSegment(newSegment1);
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 1);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1,
hashFunction);
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -482,9 +488,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
trackedSegments.add(segment1);
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
- checkRecordLocation(recordLocationMap, 0, segment1, 5, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
HashFunction.NONE);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
// Add the second segment of uploaded name format with same creation time
@@ -503,10 +509,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 1 -> {4, 120}, 2 -> {2, 100}
// uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 2,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, 1,
HashFunction.NONE);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -522,10 +528,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 1 -> {4, 120}, 2 -> {2, 100}
// newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 2,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1,
HashFunction.NONE);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -545,10 +551,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// newUploadedSegment2: 3 -> {1, 80}
// uploadedSegment3: 0 -> {0, 100}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 3,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1,
HashFunction.NONE);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
@@ -570,10 +576,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// uploadedSegment3: 0 -> {0, 100}
// uploadedSegment4: 1 -> {1, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 4,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, 2,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1,
HashFunction.NONE);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2});
assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
@@ -617,14 +623,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
int[] docIds1 = new int[]{2, 4, 5};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
validDocIdsSnapshot1.add(docIds1);
- recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys,
timestamps, deleteFlags);
+ recordInfoList1 = getRecordInfoList(primaryKeys.length, primaryKeys,
timestamps, deleteFlags);
upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
recordInfoList1.iterator());
trackedSegments.add(segment1);
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
- checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 5});
@@ -646,17 +652,17 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1 snapshot: 1 -> {4, 120}
MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
validDocIdsSnapshot2.add(0, 2, 3);
- recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys,
timestamps, deleteFlags);
+ recordInfoList2 = getRecordInfoList(primaryKeys.length, primaryKeys,
timestamps, deleteFlags);
upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2,
recordInfoList2.iterator());
trackedSegments.add(segment2);
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -668,10 +674,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -690,10 +696,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
@@ -702,14 +708,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove the original segment1
+ // this will be a no-op as segment1 is already removed from trackedSegments
upsertMetadataManager.removeSegment(segment1);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
@@ -722,10 +729,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 4);
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1,
hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 3});
@@ -736,7 +743,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 1);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1,
hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -750,7 +757,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
upsertMetadataManager.removeSegment(newSegment1);
// new segment1: 1 -> {4, 120}
assertEquals(recordLocationMap.size(), 1);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1,
hashFunction);
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -834,10 +841,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
@@ -845,10 +852,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -856,10 +863,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -867,10 +874,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
@@ -881,10 +888,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(0), 4, 120, false));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
@@ -904,7 +911,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
throws IOException {
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0,
- _contextBuilder.setHashFunction(hashFunction).build());
+
_contextBuilder.setHashFunction(hashFunction).setDropOutOfOrderRecord(true).build());
Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -930,10 +937,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
@@ -947,10 +954,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 3 -> {0, 100}, 2 -> {1, 150}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -996,10 +1003,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
@@ -1010,10 +1017,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -1024,10 +1031,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {2, 150}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 2});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -1038,10 +1045,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {3, 200}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -1054,10 +1061,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
upsertMetadataManager.addRecord(segment2, new
RecordInfo(makePrimaryKey(0), 4, 120, false));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {3, 200}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -1103,10 +1110,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
@@ -1117,10 +1124,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
@@ -1136,10 +1143,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
// in the same segment.
// segment1: 0 -> {0, 100}
// segment2: 2 -> {1, 120}, 3 -> {2, 150}, 1 -> {1, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, 1,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 2, 3});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
@@ -1152,9 +1159,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
upsertMetadataManager.removeExpiredPrimaryKeys();
// segment1: 0 -> {0, 100}
// segment2: 2 -> {1, 120}, 1 -> {1, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1,
hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, 2,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2,
hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1, 3});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]