This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 12bf942c30 [upsert] Ensure consistent creation time to prevent data
inconsistency across replicas (#16034)
12bf942c30 is described below
commit 12bf942c30d6a7bce5c10b326f0f27adf8cdeb07
Author: tarun11Mavani <[email protected]>
AuthorDate: Thu Jun 12 01:53:18 2025 +0530
[upsert] Ensure consistent creation time to prevent data inconsistency
across replicas (#16034)
---
.../realtime/RealtimeSegmentDataManager.java | 4 +--
.../manager/realtime/RealtimeTableDataManager.java | 41 ++++++++++++++++++++--
.../upsert/BasePartitionUpsertMetadataManager.java | 22 ++++++++++++
...oncurrentMapPartitionUpsertMetadataManager.java | 4 +--
...nUpsertMetadataManagerForConsistentDeletes.java | 4 +--
...ertMetadataManagerForConsistentDeletesTest.java | 5 ++-
...rrentMapPartitionUpsertMetadataManagerTest.java | 5 ++-
.../spi/index/metadata/SegmentMetadataImpl.java | 20 +++++++++++
8 files changed, 94 insertions(+), 11 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 27406557fb..5ee7a41c45 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1257,7 +1257,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
return false;
}
- _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr);
+ _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr,
_segmentZKMetadata);
removeSegmentFile();
return true;
}
@@ -1309,7 +1309,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (descriptor == null) {
return false;
}
- _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr);
+ _realtimeTableDataManager.replaceConsumingSegment(_segmentNameStr,
_segmentZKMetadata);
return true;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 004755f47c..277fd5b910 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -71,6 +71,8 @@ import
org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -691,7 +693,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
segmentName, _tableNameWithType);
if (isUpsertEnabled()) {
- handleUpsert(immutableSegment);
+ handleUpsert(immutableSegment, zkMetadata);
return;
}
@@ -730,10 +732,14 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
- private void handleUpsert(ImmutableSegment immutableSegment) {
+ private void handleUpsert(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
_logger.info("Adding immutable segment: {} with upsert enabled",
segmentName);
+ // Set the ZK creation time so that same creation time can be used to
break the comparison ties across replicas,
+ // to ensure data consistency of replica.
+ setZkCreationTimeIfAvailable(immutableSegment, zkMetadata);
+
Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, null);
Preconditions.checkNotNull(partitionId, "Failed to get partition id for
segment: " + segmentName
+ " (upsert-enabled table: " + _tableNameWithType + ")");
@@ -808,6 +814,22 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
registerSegment(segmentName, segmentDataManager);
}
+ /**
+ * Sets the ZK creation time in the segment metadata if available, to ensure
consistent
+ * creation times across replicas for upsert operations.
+ */
+ private void setZkCreationTimeIfAvailable(ImmutableSegment segment,
@Nullable SegmentZKMetadata zkMetadata) {
+ if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ if (segmentMetadata instanceof SegmentMetadataImpl) {
+ SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl)
segmentMetadata;
+ segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
+ _logger.info("Set ZK creation time {} for segment: {} in upsert
table", zkMetadata.getCreationTime(),
+ zkMetadata.getSegmentName());
+ }
+ }
+ }
+
/**
* Replaces the CONSUMING segment with a downloaded committed one.
*/
@@ -826,13 +848,26 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
/**
* Replaces the CONSUMING segment with the one sealed locally.
*/
+ @Deprecated
public void replaceConsumingSegment(String segmentName)
throws Exception {
+ replaceConsumingSegment(segmentName, null);
+ }
+
+ /**
+ * Replaces the CONSUMING segment with the one sealed locally.
+ * This overloaded method avoids extra ZK call when the caller already has
SegmentZKMetadata.
+ */
+ public void replaceConsumingSegment(String segmentName, @Nullable
SegmentZKMetadata zkMetadata)
+ throws Exception {
_logger.info("Replacing CONSUMING segment: {} with the one sealed
locally", segmentName);
File indexDir = new File(_indexDir, segmentName);
// Get a new index loading config with latest table config and schema to
load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler));
+ ImmutableSegment immutableSegment =
+ ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler);
+
+ addSegment(immutableSegment, zkMetadata);
_logger.info("Replaced CONSUMING segment: {}", segmentName);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 5dc2647935..3fbc30fce9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -56,7 +56,9 @@ import org.apache.pinot.segment.local.utils.WatermarkUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -1146,4 +1148,24 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
return Collections.emptySet();
}
+
+ /**
+ * Returns the ZooKeeper creation time for upsert consistency.
+ * This refers to the time set by the controller when creating new consuming
segment.
+ * This is used to ensure consistent creation time across replicas for upsert
+ * operations.
+ * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+ */
+ protected long getAuthoritativeCreationTime(IndexSegment segment) {
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ if (segmentMetadata instanceof SegmentMetadataImpl) {
+ SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl)
segmentMetadata;
+ long zkCreationTime = segmentMetadataImpl.getZkCreationTime();
+ if (zkCreationTime != Long.MIN_VALUE) {
+ return zkCreationTime;
+ }
+ }
+ // Fall back to local creation time if ZK creation time is not set
+ return segmentMetadata.getIndexCreationTime();
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 5552a6c65c..ad5058c703 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -136,8 +136,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
// current value, but the segment has a larger sequence number
(the segment is newer than the current
// segment).
if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
- currentSegmentName,
segment.getSegmentMetadata().getIndexCreationTime(),
-
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+ currentSegmentName, getAuthoritativeCreationTime(segment),
+ getAuthoritativeCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index a179a05fa7..9af3ec6c23 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -166,8 +166,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
// current value, but the segment has a larger sequence number
(the segment is newer than the current
// segment).
if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
- currentSegmentName,
segment.getSegmentMetadata().getIndexCreationTime(),
-
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
+ currentSegmentName, getAuthoritativeCreationTime(segment),
+ getAuthoritativeCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 31ea92cf89..6ebfd90b05 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -105,7 +105,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
-
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ long creationTimeMs = System.currentTimeMillis();
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
if (primaryKeys != null) {
when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
}
@@ -133,6 +135,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a059dcd27c..d86e1e4609 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -821,7 +821,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
-
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ long creationTimeMs = System.currentTimeMillis();
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
@@ -846,6 +848,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index be5800769e..4313973fd9 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -78,6 +78,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private final Schema _schema;
private long _crc = Long.MIN_VALUE;
private long _creationTime = Long.MIN_VALUE;
+ private long _zkCreationTime = Long.MIN_VALUE; // ZooKeeper creation time
for upsert consistency
private String _timeColumn;
private TimeUnit _timeUnit;
private Duration _timeGranularity;
@@ -149,6 +150,7 @@ public class SegmentMetadataImpl implements SegmentMetadata
{
_segmentName = segmentName;
_schema = schema;
_creationTime = creationTime;
+ _zkCreationTime = creationTime;
}
/**
@@ -380,6 +382,24 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
return _creationTime;
}
+ /**
+ * Returns the ZooKeeper creation time for upsert consistency.
+ * This refers to the time set by controller while creating the consuming
segment. It is used to ensure consistent
+ * creation time across replicas for upsert operations.
+ * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+ */
+ public long getZkCreationTime() {
+ return _zkCreationTime;
+ }
+
+ /**
+ * Sets the ZooKeeper creation time for upsert consistency.
+ * @param zkCreationTime ZK creation time in milliseconds
+ */
+ public void setZkCreationTime(long zkCreationTime) {
+ _zkCreationTime = zkCreationTime;
+ }
+
@Override
public long getLastIndexedTimestamp() {
return Long.MIN_VALUE;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]