This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9c1bb02dec Misc fixes for upsert metadata manager (#12319)
9c1bb02dec is described below
commit 9c1bb02decc32f5e685c69667a87e1bf7621fb2e
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jan 24 15:59:12 2024 -0800
Misc fixes for upsert metadata manager (#12319)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 52 +++++++++++-----------
...oncurrentMapPartitionUpsertMetadataManager.java | 43 ++++++++----------
2 files changed, 45 insertions(+), 50 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index b63f58e013..aca199659d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AtomicDouble;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -91,7 +92,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Used to maintain the largestSeenComparisonValue to avoid handling
out-of-ttl segments/records.
// If upsertTTL enabled, we will keep track of largestSeenComparisonValue to
compute expired segments.
- protected volatile double _largestSeenComparisonValue;
+ protected final AtomicDouble _largestSeenComparisonValue;
// The following variables are always accessed within synchronized block
private boolean _stopped;
@@ -116,9 +117,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
_serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
- _largestSeenComparisonValue = loadWatermark();
+ _largestSeenComparisonValue = new AtomicDouble(loadWatermark());
} else {
- _largestSeenComparisonValue = Double.MIN_VALUE;
+ _largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
deleteWatermark();
}
}
@@ -166,17 +167,17 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
double maxComparisonValue =
((Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
.getMaxValue()).doubleValue();
- _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue,
maxComparisonValue);
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
}
// Skip adding segment that has max comparison value smaller than
(largestSeenComparisonValue - TTL)
- if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
+ if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot
enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
Number maxComparisonValue =
(Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
- if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue -
_metadataTTL) {
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get()
- _metadataTTL) {
_logger.info("Skip adding segment: {} because it's out of TTL",
segmentName);
MutableRoaringBitmap validDocIdsSnapshot =
immutableSegment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
@@ -245,11 +246,20 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
+ updatePrimaryKeyGauge(numPrimaryKeys);
+ _logger.info("Finished adding segment: {} in {}ms, current primary key
count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+ }
+
+ protected abstract long getNumPrimaryKeys();
+
+ protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
+ }
- _logger.info("Finished adding segment: {} in {}ms, current primary key
count: {}", segmentName,
- System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+ protected void updatePrimaryKeyGauge() {
+ updatePrimaryKeyGauge(getNumPrimaryKeys());
}
@Override
@@ -275,7 +285,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
- private void doPreloadSegment(ImmutableSegmentImpl segment) {
+ protected void doPreloadSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Preloading segment: {}, current primary key count: {}",
segmentName, getNumPrimaryKeys());
long startTimeMs = System.currentTimeMillis();
@@ -301,8 +311,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
+ updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished preloading segment: {} in {}ms, current primary key
count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
@@ -347,8 +356,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
- protected abstract long getNumPrimaryKeys();
-
protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap
validDocIdsForOldSegment);
@@ -378,8 +385,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
/**
- Returns {@code true} when the record is added to the upsert metadata
manager,
- {@code false} when the record is out-of-order thus not added.
+ * Returns {@code true} when the record is added to the upsert metadata
manager, {@code false} when the record is
+ * out-of-order thus not added.
*/
protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo
recordInfo);
@@ -433,9 +440,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
+ updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished replacing segment: {} in {}ms, current primary key
count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
@@ -506,10 +511,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return;
}
// Skip removing segment that has max comparison value smaller than
(largestSeenComparisonValue - TTL)
- if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
+ if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
Number maxComparisonValue =
(Number)
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
- if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue -
_metadataTTL) {
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get()
- _metadataTTL) {
_logger.info("Skip removing segment: {} because it's out of TTL",
segmentName);
return;
}
@@ -556,9 +561,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
+ updatePrimaryKeyGauge(numPrimaryKeys);
_logger.info("Finished removing segment: {} in {}ms, current primary key
count: {}", segmentName,
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
@@ -793,8 +796,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// We don't remove the segment from the metadata manager when
// it's closed. This was done to make table deletion faster. Since we
don't remove the segment, we never decrease
// the primary key count. So, we set the primary key count to 0 here.
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- 0L);
+ updatePrimaryKeyGauge(0);
_logger.info("Closed the metadata manager");
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 576d679368..887582538c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -223,22 +222,22 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
@Override
public void doRemoveExpiredPrimaryKeys() {
- AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
+ AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+ double largestSeenComparisonValue = _largestSeenComparisonValue.get();
double metadataTTLKeysThreshold;
if (_metadataTTL > 0) {
- metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+ metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL;
} else {
metadataTTLKeysThreshold = Double.MIN_VALUE;
}
-
double deletedKeysThreshold;
-
if (_deletedKeysTTL > 0) {
- deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+ deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
} else {
deletedKeysThreshold = Double.MIN_VALUE;
}
+
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
double comparisonValue = ((Number)
recordLocation.getComparisonValue()).doubleValue();
if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
@@ -255,29 +254,25 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
});
if (_metadataTTL > 0) {
- persistWatermark(_largestSeenComparisonValue);
+ persistWatermark(largestSeenComparisonValue);
}
- int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
- if (numDeletedTTLKeys > 0) {
- _logger.info("Deleted {} primary keys based on deletedKeysTTL in the
table {}", numDeletedTTLKeys,
- _tableNameWithType);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
- numDeletedTTLKeys);
- }
+ // Update metrics
+ updatePrimaryKeyGauge();
int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
if (numMetadataTTLKeys > 0) {
- _logger.info("Deleted {} primary keys based on metadataTTL in the table
{}", numMetadataTTLKeys,
- _tableNameWithType);
+ _logger.info("Deleted {} primary keys based on metadataTTL",
numMetadataTTLKeys);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
numMetadataTTLKeys);
}
+ int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+ if (numDeletedTTLKeys > 0) {
+ _logger.info("Deleted {} primary keys based on deletedKeysTTL",
numDeletedTTLKeys);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+ numDeletedTTLKeys);
+ }
}
- /**
- Returns {@code true} when the record is added to the upsert metadata
manager,
- {@code false} when the record is out-of-order thus not added.
- */
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo)
{
AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -289,7 +284,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
// When TTL is enabled, update largestSeenComparisonValue when adding new
record
if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
double comparisonValue = ((Number) newComparisonValue).doubleValue();
- _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue,
comparisonValue);
+ _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
comparisonValue));
}
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
@@ -310,8 +305,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
+ // Out-of-order record
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
- // this is a out-of-order record then set value to true - this
indicates whether out-of-order or not
isOutOfOrderRecord.set(true);
return currentRecordLocation;
}
@@ -322,9 +317,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
});
- // Update metrics
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- _primaryKeyToRecordLocationMap.size());
+ updatePrimaryKeyGauge();
return !isOutOfOrderRecord.get();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]