This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 50ad111437d Add upsert support for offline tables (#17789)
50ad111437d is described below
commit 50ad111437dc1f3c913e1900ae24d28ce21ffe96
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Mar 7 22:36:50 2026 -0800
Add upsert support for offline tables (#17789)
Enable primary-key-based deduplication (upsert) for OFFLINE tables,
extending a capability previously limited to REALTIME tables. This
allows batch-ingested data to leverage the same upsert semantics.
Core changes:
- Add three-level comparison column fallback for resolving record
conflicts: configured comparison columns → time column → segment
creation/push time (via ConstantComparisonColumnReader)
- Extend OfflineTableDataManager with full upsert lifecycle: init,
addSegment, replaceSegment, getSegmentContexts, and shutdown
- Validate that offline upsert tables have SegmentPartitionConfig
to ensure partition-aware segment assignment
- Use zkMetadata-based partition ID lookup when available to avoid
redundant ZK reads
- Guard TTL code paths against empty comparison columns to prevent
IndexOutOfBoundsException in segment-creation-time mode
Refactoring:
- Move shared upsert methods (handleUpsert, replaceUpsertSegment,
registerSegment, setZkOperationTimeIfAvailable, getSegmentContexts,
isUpsertEnabled, getPartitionToPrimaryKeyCount) from
RealtimeTableDataManager into BaseTableDataManager
- Add isUpsertEnabled(), getTableUpsertMetadataManager(), and
getPartitionToPrimaryKeyCount() to the TableDataManager interface
with default implementations
- Remove instanceof checks in PrimaryKeyCount, TablesResource, and
SingleTableExecutionInfo in favor of interface methods
Testing:
- Add OfflineUpsertTableIntegrationTest covering dedup query results,
skipUpsert option, and segment replacement
- Update TableConfigUtilsTest for new validation rules
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 133 +++++++++
.../manager/offline/OfflineTableDataManager.java | 39 +++
.../manager/realtime/RealtimeTableDataManager.java | 133 +--------
.../query/executor/SingleTableExecutionInfo.java | 22 +-
.../tests/OfflineUpsertTableIntegrationTest.java | 306 +++++++++++++++++++++
.../local/data/manager/TableDataManager.java | 24 ++
.../upsert/BasePartitionUpsertMetadataManager.java | 51 ++--
.../upsert/BaseTableUpsertMetadataManager.java | 9 +-
...oncurrentMapPartitionUpsertMetadataManager.java | 4 +-
...nUpsertMetadataManagerForConsistentDeletes.java | 4 +-
.../pinot/segment/local/upsert/UpsertContext.java | 2 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 36 +++
.../segment/local/utils/TableConfigUtils.java | 40 ++-
.../segment/local/utils/TableConfigUtilsTest.java | 9 +-
.../spi/index/metadata/SegmentMetadataImpl.java | 26 +-
.../server/api/resources/PrimaryKeyCount.java | 12 +-
.../pinot/server/api/resources/TablesResource.java | 8 +-
17 files changed, 656 insertions(+), 202 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 7458ca9d231..07dd67d376a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -60,7 +60,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.ExceptionUtils;
+import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -78,6 +80,8 @@ import
org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import
org.apache.pinot.segment.local.segment.index.loader.invertedindex.MultiColumnTextIndexHandler;
import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -107,9 +111,11 @@ import
org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +128,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected final ConcurrentHashMap<String, SegmentDataManager>
_segmentDataManagerMap = new ConcurrentHashMap<>();
protected final ServerMetrics _serverMetrics = ServerMetrics.get();
+ protected TableUpsertMetadataManager _tableUpsertMetadataManager;
protected InstanceDataManagerConfig _instanceDataManagerConfig;
protected String _instanceId;
@@ -784,9 +791,135 @@ public abstract class BaseTableDataManager implements
TableDataManager {
Map<String, String> queryOptions) {
List<SegmentContext> segmentContexts = new
ArrayList<>(selectedSegments.size());
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
+ if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
+ _tableUpsertMetadataManager.setSegmentContexts(segmentContexts,
queryOptions);
+ }
return segmentContexts;
}
+ @Override
+ public boolean isUpsertEnabled() {
+ return _tableUpsertMetadataManager != null;
+ }
+
+ @VisibleForTesting
+ @Override
+ public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+ return _tableUpsertMetadataManager;
+ }
+
+ @Override
+ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+ if (isUpsertEnabled()) {
+ return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+ }
+ return Collections.emptyMap();
+ }
+
+ protected void handleUpsert(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
+ String segmentName = immutableSegment.getSegmentName();
+ _logger.info("Adding immutable segment: {} with upsert enabled",
segmentName);
+ Integer partitionId;
+ if (zkMetadata == null &&
TableNameBuilder.isOfflineTableResource(_tableNameWithType)) {
+ zkMetadata =
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType,
+ segmentName);
+ }
+ setZkOperationTimeIfAvailable(immutableSegment, zkMetadata);
+ if (TableNameBuilder.isOfflineTableResource(_tableNameWithType)) {
+ Preconditions.checkState(zkMetadata != null,
+ "Failed to find segment ZK metadata for segment: %s of OFFLINE
table: %s", segmentName, _tableNameWithType);
+ partitionId = SegmentUtils.getSegmentPartitionId(zkMetadata, null);
+ } else {
+ partitionId = SegmentUtils.getSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, null);
+ }
+
+ Preconditions.checkNotNull(partitionId,
+ "Failed to get partition id for segment: %s (upsert-enabled table:
%s). "
+ + "Segment must follow a naming convention that encodes partition
id (e.g. LLCSegmentName, "
+ + "UploadedRealtimeSegmentName), or have partition metadata
configured via SegmentPartitionConfig.",
+ segmentName, _tableNameWithType);
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+ _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
+
+ _serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
+ immutableSegment.getSegmentMetadata().getTotalDocs());
+ _serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
+ ImmutableSegmentDataManager newSegmentManager = new
ImmutableSegmentDataManager(immutableSegment);
+ if (partitionUpsertMetadataManager.isPreloading()) {
+ // Register segment after it is preloaded and has initialized its
validDocIds. The order of preloading and
+ // registering segment doesn't matter much as preloading happens before
the table partition is ready for queries.
+ partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ _logger.info("Preloaded immutable segment: {} with upsert enabled",
segmentName);
+ return;
+ }
+ SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager == null) {
+ // When adding a new segment, register it *before*
partitionUpsertMetadataManager.addSegment() fully initializes
+ // the validDocIds bitmap. This lets queries access the new segment
immediately while the bitmap is being built.
+ // Without early registration, docs in existing segments that get
invalidated by this new segment would become
+ // invisible to queries until addSegment() completes.
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
+ partitionUpsertMetadataManager.addSegment(immutableSegment);
+ _logger.info("Added new immutable segment: {} with upsert enabled",
segmentName);
+ } else {
+ replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager,
partitionUpsertMetadataManager);
+ }
+ }
+
+ protected void replaceUpsertSegment(String segmentName, SegmentDataManager
oldSegmentManager,
+ ImmutableSegmentDataManager newSegmentManager,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ ImmutableSegment immutableSegment = newSegmentManager.getSegment();
+ UpsertConfig.ConsistencyMode consistencyMode =
_tableUpsertMetadataManager.getContext().getConsistencyMode();
+ if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
+ // When replacing a segment, register the new segment *after*
replaceSegment() finishes filling its validDocIds
+ // bitmap. Otherwise queries lose access to valid docs in the old
segment before the new bitmap is ready.
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ } else {
+ // For consistency modes, keep both old and new segments visible to
queries during replacement so that queries
+ // can see the new updates in the new segment while the old segment's
validDocIds are still being updated.
+ // Register the new segment to the upsert metadata manager before making
it visible to queries so the upsert
+ // view is updated before any query can access it.
+ SegmentDataManager duoSegmentDataManager = new
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
+ registerSegment(segmentName, duoSegmentDataManager,
partitionUpsertMetadataManager);
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
+ }
+ _logger.info("Replaced {} segment: {} with upsert enabled and consistency
mode: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, consistencyMode);
+ oldSegmentManager.offload();
+ releaseSegment(oldSegmentManager);
+ }
+
+ protected void registerSegment(String segmentName, SegmentDataManager
segmentDataManager,
+ @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager)
{
+ if (partitionUpsertMetadataManager != null) {
+
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
+ }
+ registerSegment(segmentName, segmentDataManager);
+ }
+
+ protected void setZkOperationTimeIfAvailable(ImmutableSegment segment,
@Nullable SegmentZKMetadata zkMetadata) {
+ if (zkMetadata == null) {
+ return;
+ }
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ if (segmentMetadata instanceof SegmentMetadataImpl) {
+ SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl)
segmentMetadata;
+ if (zkMetadata.getCreationTime() > 0) {
+ segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
+ }
+ if (zkMetadata.getPushTime() > 0) {
+ segmentMetadataImpl.setZkPushTime(zkMetadata.getPushTime());
+ }
+ _logger.info("Set ZK creation time: {}, push time: {} for segment: {} in
upsert table",
+ zkMetadata.getCreationTime(), zkMetadata.getPushTime(),
zkMetadata.getSegmentName());
+ }
+ }
+
private void reloadSegments(List<SegmentDataManager> segmentDataManagers,
IndexLoadingConfig indexLoadingConfig,
boolean forceDownload, String reloadJobId)
throws Exception {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index a99712ec3bb..23105d54a38 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -18,11 +18,19 @@
*/
package org.apache.pinot.core.data.manager.offline;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
/**
@@ -33,6 +41,14 @@ public class OfflineTableDataManager extends
BaseTableDataManager {
@Override
protected void doInit() {
+ Pair<TableConfig, Schema> tableConfigAndSchema =
getCachedTableConfigAndSchema();
+ TableConfig tableConfig = tableConfigAndSchema.getLeft();
+ Schema schema = tableConfigAndSchema.getRight();
+ if (tableConfig.isUpsertEnabled()) {
+ _tableUpsertMetadataManager =
+
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
tableConfig, schema,
+ this, _segmentOperationsThrottlerSet);
+ }
}
@Override
@@ -41,7 +57,17 @@ public class OfflineTableDataManager extends
BaseTableDataManager {
@Override
protected void doShutdown() {
+ if (_tableUpsertMetadataManager != null) {
+ _tableUpsertMetadataManager.stop();
+ }
releaseAndRemoveAllSegments();
+ if (_tableUpsertMetadataManager != null) {
+ try {
+ _tableUpsertMetadataManager.close();
+ } catch (IOException e) {
+ _logger.warn("Caught exception while closing upsert metadata manager",
e);
+ }
+ }
}
protected void doAddOnlineSegment(String segmentName)
@@ -57,6 +83,19 @@ public class OfflineTableDataManager extends
BaseTableDataManager {
}
}
+ @Override
+ public void addSegment(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
+ String segmentName = immutableSegment.getSegmentName();
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add segment: %s to
table: %s",
+ segmentName, _tableNameWithType);
+ if (isUpsertEnabled()) {
+ handleUpsert(immutableSegment, zkMetadata);
+ return;
+ }
+ super.addSegment(immutableSegment, zkMetadata);
+ }
+
@Override
public void addConsumingSegment(String segmentName) {
throw new UnsupportedOperationException("Cannot add CONSUMING segment to
OFFLINE table");
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d4520a74284..3b606ebb949 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -53,10 +53,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
-import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -68,15 +65,11 @@ import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -150,7 +143,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private IngestionDelayTracker _ingestionDelayTracker;
private TableDedupMetadataManager _tableDedupMetadataManager;
- private TableUpsertMetadataManager _tableUpsertMetadataManager;
private BooleanSupplier _isTableReadyToConsumeData;
private boolean _enforceConsumptionInOrder = false;
@@ -372,17 +364,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_ingestionDelayTracker.stopTrackingPartition(segmentName);
}
- @Override
- public List<SegmentContext> getSegmentContexts(List<IndexSegment>
selectedSegments,
- Map<String, String> queryOptions) {
- List<SegmentContext> segmentContexts = new
ArrayList<>(selectedSegments.size());
- selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
- if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
- _tableUpsertMetadataManager.setSegmentContexts(segmentContexts,
queryOptions);
- }
- return segmentContexts;
- }
-
/**
* Returns thread safe StreamMetadataProvider which is shared across
different callers.
*/
@@ -450,10 +431,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return _tableDedupMetadataManager != null;
}
- public boolean isUpsertEnabled() {
- return _tableUpsertMetadataManager != null;
- }
-
public boolean isPartialUpsertEnabled() {
return _tableUpsertMetadataManager != null
&& _tableUpsertMetadataManager.getContext().getUpsertMode() ==
UpsertConfig.Mode.PARTIAL;
@@ -779,104 +756,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
- private void handleUpsert(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
- String segmentName = immutableSegment.getSegmentName();
- _logger.info("Adding immutable segment: {} with upsert enabled",
segmentName);
-
- // Set the ZK creation time so that same creation time can be used to
break the comparison ties across replicas,
- // to ensure data consistency of replica.
- setZkCreationTimeIfAvailable(immutableSegment, zkMetadata);
-
- Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, null);
- Preconditions.checkNotNull(partitionId, "Failed to get partition id for
segment: " + segmentName
- + " (upsert-enabled table: " + _tableNameWithType + ")");
- PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
-
- _serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
- immutableSegment.getSegmentMetadata().getTotalDocs());
- _serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
- ImmutableSegmentDataManager newSegmentManager = new
ImmutableSegmentDataManager(immutableSegment);
- if (partitionUpsertMetadataManager.isPreloading()) {
- // Register segment after it is preloaded and has initialized its
validDocIds. The order of preloading and
- // registering segment doesn't matter much as preloading happens before
table partition is ready for queries.
- partitionUpsertMetadataManager.preloadSegment(immutableSegment);
- registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
- _logger.info("Preloaded immutable segment: {} with upsert enabled",
segmentName);
- return;
- }
- SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager == null) {
- // When adding a new segment, we should register it 'before' it is fully
initialized by
- // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
- // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
- // access the new segment asap even though its validDocId bitmap is
still being filled by
- // partitionUpsertMetadataManager.
- registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
- partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
- partitionUpsertMetadataManager.addSegment(immutableSegment);
- _logger.info("Added new immutable segment: {} with upsert enabled",
segmentName);
- } else {
- replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager,
partitionUpsertMetadataManager);
- }
- }
-
- private void replaceUpsertSegment(String segmentName, SegmentDataManager
oldSegmentManager,
- ImmutableSegmentDataManager newSegmentManager,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
- // When replacing a segment, we should register the new segment 'after' it
is fully initialized by
- // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
- // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
- // being filled by partitionUpsertMetadataManager, making the queries see
less valid docs than expected.
- IndexSegment oldSegment = oldSegmentManager.getSegment();
- ImmutableSegment immutableSegment = newSegmentManager.getSegment();
- UpsertConfig.ConsistencyMode consistencyMode =
_tableUpsertMetadataManager.getContext().getConsistencyMode();
- if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
- partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
- } else {
- // By default, when replacing a segment, the old segment is kept intact
and visible to query until the new
- // segment is registered as in the if-branch above. But the newly
ingested records will invalidate valid
- // docs in the new segment as the upsert metadata gets updated during
replacement, so the query will miss the
- // new updates in the new segment, until it's registered after the
replacement is done.
- // For consistent data view, we make both old and new segment visible to
the query and update both in place
- // when segment replacement and new data ingestion are happening in
parallel.
- SegmentDataManager duoSegmentDataManager = new
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
- registerSegment(segmentName, duoSegmentDataManager,
partitionUpsertMetadataManager);
- partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
- }
- _logger.info("Replaced {} segment: {} with upsert enabled and consistency
mode: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, consistencyMode);
- oldSegmentManager.offload();
- releaseSegment(oldSegmentManager);
- }
-
- private void registerSegment(String segmentName, SegmentDataManager
segmentDataManager,
- @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager)
{
- if (partitionUpsertMetadataManager != null) {
- // Register segment to the upsert metadata manager before registering it
to table manager, so that the upsert
- // metadata manger can update the upsert view before the segment becomes
visible to queries.
-
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
- }
- registerSegment(segmentName, segmentDataManager);
- }
-
- /**
- * Sets the ZK creation time in the segment metadata if available, to ensure
consistent
- * creation times across replicas for upsert operations.
- */
- private void setZkCreationTimeIfAvailable(ImmutableSegment segment,
@Nullable SegmentZKMetadata zkMetadata) {
- if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
- SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
- if (segmentMetadata instanceof SegmentMetadataImpl) {
- SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl)
segmentMetadata;
- segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
- _logger.info("Set ZK creation time {} for segment: {} in upsert
table", zkMetadata.getCreationTime(),
- zkMetadata.getSegmentName());
- }
- }
- }
-
/**
* Replaces the CONSUMING segment with a downloaded committed one.
*/
@@ -925,22 +804,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return _instanceId;
}
- @VisibleForTesting
- public TableUpsertMetadataManager getTableUpsertMetadataManager() {
- return _tableUpsertMetadataManager;
- }
-
@VisibleForTesting
public TableDedupMetadataManager getTableDedupMetadataManager() {
return _tableDedupMetadataManager;
}
- /**
- * Retrieves a mapping of partition id to the primary key count for the
partition.
- * Supports both upsert and dedup enabled tables.
- *
- * @return A {@code Map} where keys are partition id and values are count of
primary keys for that specific partition.
- */
+ @Override
public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
if (isUpsertEnabled()) {
return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
index aba2be49adc..5c0949f017f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.executor;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -74,17 +75,18 @@ public class SingleTableExecutionInfo implements
TableExecutionInfo {
List<IndexSegment> indexSegments;
Map<IndexSegment, SegmentContext> providedSegmentContexts = null;
- if (!isUpsertTable(tableDataManager)) {
+ if (!tableDataManager.isUpsertEnabled()) {
segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery,
optionalSegments, notAcquiredSegments);
indexSegments = new ArrayList<>(segmentDataManagers.size());
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
indexSegments.add(segmentDataManager.getSegment());
}
} else {
- RealtimeTableDataManager rtdm = (RealtimeTableDataManager)
tableDataManager;
- TableUpsertMetadataManager tumm = rtdm.getTableUpsertMetadataManager();
+ TableUpsertMetadataManager tumm =
tableDataManager.getTableUpsertMetadataManager();
+ Preconditions.checkState(tumm != null,
+ "TableUpsertMetadataManager is null for upsert-enabled table: %s",
tableNameWithType);
boolean isUsingConsistencyMode =
-
rtdm.getTableUpsertMetadataManager().getContext().getConsistencyMode() !=
UpsertConfig.ConsistencyMode.NONE;
+ tumm.getContext().getConsistencyMode() !=
UpsertConfig.ConsistencyMode.NONE;
if (isUsingConsistencyMode) {
tumm.lockForSegmentContexts();
}
@@ -128,18 +130,6 @@ public class SingleTableExecutionInfo implements
TableExecutionInfo {
segmentsToQuery, optionalSegments, notAcquiredSegments);
}
- private static boolean isUpsertTable(TableDataManager tableDataManager) {
- // For upsert table, the server can start to process newly added segments
before brokers can add those segments
- // into their routing tables, like newly created consuming segment or
newly uploaded segments. We should include
- // those segments in the list of segments for query to process on the
server, otherwise, the query will see less
- // than expected valid docs from the upsert table.
- if (tableDataManager instanceof RealtimeTableDataManager) {
- RealtimeTableDataManager rtdm = (RealtimeTableDataManager)
tableDataManager;
- return rtdm.isUpsertEnabled();
- }
- return false;
- }
-
private SingleTableExecutionInfo(TableDataManager tableDataManager,
List<SegmentDataManager> segmentDataManagers,
List<IndexSegment> indexSegments, Map<IndexSegment, SegmentContext>
providedSegmentContexts,
List<String> segmentsToQuery, List<String> optionalSegments,
List<String> notAcquiredSegments) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
new file mode 100644
index 00000000000..7dbb755745d
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test for offline table upsert support.
+ *
+ * Tests that OFFLINE tables with upsert enabled correctly deduplicate records
by primary key,
+ * keeping the latest record based on the comparison column (time column).
+ *
+ * Test data layout:
+ * Segment 1 (partition 0): playerId=100 (score=2000, ts=1000), playerId=101
(score=3000, ts=1000)
+ * Segment 2 (partition 0): playerId=100 (score=2500, ts=2000), playerId=102
(score=4000, ts=1000)
+ * Segment 3 (partition 0): playerId=101 (score=3500, ts=2000), playerId=102
(score=4500, ts=2000)
+ *
+ * After upsert dedup (latest by timestampInEpoch):
+ * playerId=100 -> score=2500 (from segment 2, ts=2000)
+ * playerId=101 -> score=3500 (from segment 3, ts=2000)
+ * playerId=102 -> score=4500 (from segment 3, ts=2000)
+ */
+public class OfflineUpsertTableIntegrationTest extends
BaseClusterIntegrationTest {
+ private static final String TABLE_NAME = "offlineUpsertTest";
+ private static final String PRIMARY_KEY_COL = "playerId";
+ private static final String TIME_COL_NAME = "timestampInEpoch";
+ private static final int NUM_PARTITIONS = 1;
+ private static final int TOTAL_RAW_RECORDS = 6;
+ private static final int UNIQUE_PRIMARY_KEYS = 3;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ // Create and upload schema
+ Schema schema = createUpsertSchema();
+ addSchema(schema);
+
+ // Create OFFLINE table config with upsert enabled
+ TableConfig tableConfig = createOfflineUpsertTableConfig();
+ addTableConfig(tableConfig);
+
+ // Build and upload segments with overlapping primary keys
+ buildAndUploadTestSegments(tableConfig, schema);
+
+ // Wait for all documents to load
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ dropOfflineTable(TABLE_NAME);
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Override
+ protected String getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Nullable
+ @Override
+ protected String getTimeColumnName() {
+ return TIME_COL_NAME;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return UNIQUE_PRIMARY_KEYS;
+ }
+
+ @Override
+ protected void waitForAllDocsLoaded(long timeoutMs)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert() == TOTAL_RAW_RECORDS;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ }
+
+ /**
+ * Tests that COUNT(*) returns only unique primary keys (deduplication is
working).
+ */
+ @Test
+ public void testUpsertQueryResults()
+ throws Exception {
+ // With upsert: should return only 3 unique primary keys
+ long upsertCount = getCurrentCountStarResult();
+ assertEquals(upsertCount, UNIQUE_PRIMARY_KEYS,
+ "Expected " + UNIQUE_PRIMARY_KEYS + " unique records after upsert
dedup");
+
+ // Without upsert: should return all 6 raw records
+ long rawCount = queryCountStarWithoutUpsert();
+ assertEquals(rawCount, TOTAL_RAW_RECORDS,
+ "Expected " + TOTAL_RAW_RECORDS + " raw records with skipUpsert=true");
+
+ // Verify the latest records are returned (by checking scores)
+ ResultSet rs = getPinotConnection().execute(
+ "SELECT playerId, score FROM " + TABLE_NAME + " ORDER BY
playerId").getResultSet(0);
+ assertEquals(rs.getRowCount(), UNIQUE_PRIMARY_KEYS);
+
+ // playerId=100 -> score=2500 (latest from segment 2)
+ assertEquals(rs.getInt(0, 0), 100);
+ assertEquals(rs.getFloat(0, 1), 2500.0f, 0.01f);
+
+ // playerId=101 -> score=3500 (latest from segment 3)
+ assertEquals(rs.getInt(1, 0), 101);
+ assertEquals(rs.getFloat(1, 1), 3500.0f, 0.01f);
+
+ // playerId=102 -> score=4500 (latest from segment 3)
+ assertEquals(rs.getInt(2, 0), 102);
+ assertEquals(rs.getFloat(2, 1), 4500.0f, 0.01f);
+ }
+
+ /**
+ * Tests that uploading a new segment with updated records replaces older
values.
+ */
+ @Test(dependsOnMethods = "testUpsertQueryResults")
+ public void testSegmentReplacement()
+ throws Exception {
+ Schema schema = createUpsertSchema();
+ TableConfig tableConfig = getOfflineTableConfig();
+
+ // Build a new segment with updated values for playerId=100
+ List<GenericRow> rows = new ArrayList<>();
+ GenericRow row = new GenericRow();
+ row.putValue(PRIMARY_KEY_COL, 100);
+ row.putValue("name", "UpdatedPlayer");
+ row.putValue("game", "chess");
+ row.putValue("score", 9999.0f);
+ row.putValue(TIME_COL_NAME, 1691036400000L);
+ rows.add(row);
+
+ File newSegmentDir = new File(_tempDir, "newSegmentDir");
+ File newTarDir = new File(_tempDir, "newTarDir");
+ TestUtils.ensureDirectoriesExistAndEmpty(newSegmentDir, newTarDir);
+ buildSegment(tableConfig, schema, "segment_update_0", rows, newSegmentDir,
newTarDir);
+ uploadSegments(TABLE_NAME, newTarDir);
+
+ // Wait for the new segment to load (7 total raw records now)
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert() == TOTAL_RAW_RECORDS + 1;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, 600_000L, "Failed to load updated segment");
+
+ // Verify upsert still returns 3 unique primary keys
+ assertEquals(getCurrentCountStarResult(), UNIQUE_PRIMARY_KEYS);
+
+ // Verify playerId=100 now has the updated score
+ ResultSet rs = getPinotConnection().execute(
+ "SELECT score FROM " + TABLE_NAME + " WHERE playerId =
100").getResultSet(0);
+ assertEquals(rs.getRowCount(), 1);
+ assertEquals(rs.getFloat(0, 0), 9999.0f, 0.01f);
+ }
+
+ private Schema createUpsertSchema() {
+ return new Schema.SchemaBuilder()
+ .setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(PRIMARY_KEY_COL, FieldSpec.DataType.INT)
+ .addSingleValueDimension("name", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("game", FieldSpec.DataType.STRING)
+ .addMetric("score", FieldSpec.DataType.FLOAT)
+ .addDateTime(TIME_COL_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(List.of(PRIMARY_KEY_COL))
+ .build();
+ }
+
+ private TableConfig createOfflineUpsertTableConfig() {
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new
HashMap<>();
+ columnPartitionConfigMap.put(PRIMARY_KEY_COL, new
ColumnPartitionConfig("Murmur", NUM_PARTITIONS));
+
+ return new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(TABLE_NAME)
+ .setTimeColumnName(TIME_COL_NAME)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null,
+ RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
+ .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PRIMARY_KEY_COL, 1))
+ .build();
+ }
+
+ private void buildAndUploadTestSegments(TableConfig tableConfig, Schema
schema)
+ throws Exception {
+ // Segment 1: playerId=100 (score=2000, ts=1671036400000), playerId=101
(score=3000, ts=1671036400000)
+ List<GenericRow> segment1Rows = new ArrayList<>();
+ segment1Rows.add(createRow(100, "Alice", "chess", 2000.0f,
1671036400000L));
+ segment1Rows.add(createRow(101, "Bob", "chess", 3000.0f, 1671036400000L));
+ buildSegment(tableConfig, schema, "segment_0", segment1Rows, _segmentDir,
_tarDir);
+
+ // Segment 2: playerId=100 (score=2500, ts=1681036400000), playerId=102
(score=4000, ts=1671036400000)
+ List<GenericRow> segment2Rows = new ArrayList<>();
+ segment2Rows.add(createRow(100, "Alice", "chess", 2500.0f,
1681036400000L));
+ segment2Rows.add(createRow(102, "Charlie", "chess", 4000.0f,
1671036400000L));
+ buildSegment(tableConfig, schema, "segment_1", segment2Rows, _segmentDir,
_tarDir);
+
+ // Segment 3: playerId=101 (score=3500, ts=1681036400000), playerId=102
(score=4500, ts=1681036400000)
+ List<GenericRow> segment3Rows = new ArrayList<>();
+ segment3Rows.add(createRow(101, "Bob", "chess", 3500.0f, 1681036400000L));
+ segment3Rows.add(createRow(102, "Charlie", "chess", 4500.0f,
1681036400000L));
+ buildSegment(tableConfig, schema, "segment_2", segment3Rows, _segmentDir,
_tarDir);
+
+ uploadSegments(TABLE_NAME, _tarDir);
+ }
+
+ private GenericRow createRow(int playerId, String name, String game, float
score, long timestamp) {
+ GenericRow row = new GenericRow();
+ row.putValue(PRIMARY_KEY_COL, playerId);
+ row.putValue("name", name);
+ row.putValue("game", game);
+ row.putValue("score", score);
+ row.putValue(TIME_COL_NAME, timestamp);
+ return row;
+ }
+
+ private void buildSegment(TableConfig tableConfig, Schema schema, String
segmentName,
+ List<GenericRow> rows, File segmentDir, File tarDir)
+ throws Exception {
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(segmentDir.getPath());
+ config.setTableName(tableConfig.getTableName());
+ config.setSegmentName(segmentName);
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ RecordReader recordReader = new GenericRowRecordReader(rows);
+ driver.init(config, recordReader);
+ driver.build();
+
+ File indexDir = new File(segmentDir, segmentName);
+ File segmentTarFile = new File(tarDir, segmentName +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
+ }
+
+ private long queryCountStarWithoutUpsert() {
+ return getPinotConnection().execute(
+ "SELECT COUNT(*) FROM " + TABLE_NAME + "
OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index b52a69ab363..903eeb9aba3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.data.manager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import java.io.File;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -32,6 +33,7 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
@@ -358,4 +360,26 @@ public interface TableDataManager {
* @return List of {@link StaleSegment} with segment names and reason why it
is stale
*/
List<StaleSegment> getStaleSegments();
+
+ /**
+ * Returns whether upsert is enabled for this table.
+ */
+ default boolean isUpsertEnabled() {
+ return false;
+ }
+
+ /**
+ * Returns the table upsert metadata manager if upsert is enabled, null
otherwise.
+ */
+ @Nullable
+ default TableUpsertMetadataManager getTableUpsertMetadataManager() {
+ return null;
+ }
+
+ /**
+ * Returns a mapping of partition id to primary key count. Supports both
upsert and dedup enabled tables.
+ */
+ default Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+ return Collections.emptyMap();
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index ab414dd6ede..aa43d89f819 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -62,10 +62,12 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
@@ -79,6 +81,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected static final double TTL_WATERMARK_NOT_SET =
Double.NEGATIVE_INFINITY;
protected final String _tableNameWithType;
+ protected final TableType _tableType;
protected final int _partitionId;
protected final UpsertContext _context;
protected final List<String> _primaryKeyColumns;
@@ -138,6 +141,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, UpsertContext context) {
_tableNameWithType = tableNameWithType;
+ _tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
_partitionId = partitionId;
_context = context;
_primaryKeyColumns = context.getPrimaryKeyColumns();
@@ -299,6 +303,18 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ /**
+ * Creates a RecordInfoReader for the given segment. When comparison columns
are configured, reads comparison values
+ * from the columns. When comparison columns are empty, uses segment
creation time as the comparison value.
+ */
+ protected UpsertUtils.RecordInfoReader createRecordInfoReader(IndexSegment
segment) {
+ if (_comparisonColumns.isEmpty()) {
+ long segmentCreationTime = getAuthoritativeUpdateOrCreationTime(segment);
+ return new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
segmentCreationTime, _deleteRecordColumn);
+ }
+ return new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn);
+ }
+
protected boolean isTTLEnabled() {
return _metadataTTL > 0 || _deletedKeysTTL > 0;
}
@@ -351,7 +367,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected void doAddSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}",
segmentName, getNumPrimaryKeys());
- if (isTTLEnabled()) {
+ if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
double maxComparisonValue = getMaxComparisonValue(segment);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
if (isOutOfMetadataTTL(maxComparisonValue) &&
skipAddSegmentOutOfTTL(segment)) {
@@ -362,8 +378,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (!_enableSnapshot) {
deleteSnapshot(segment);
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader =
createRecordInfoReader(segment)) {
Iterator<RecordInfo> recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
addSegment(segment, null, null, recordInfoIterator);
@@ -427,15 +442,14 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
return;
}
- if (isTTLEnabled()) {
+ if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
double maxComparisonValue = getMaxComparisonValue(segment);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
if (isOutOfMetadataTTL(maxComparisonValue) &&
skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
return;
}
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader =
createRecordInfoReader(segment)) {
doPreloadSegment(segment, null, null,
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
} catch (Exception e) {
throw new RuntimeException(
@@ -600,14 +614,13 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
replaceSegment(segment, null, null, null, oldSegment);
return;
}
- if (isTTLEnabled()) {
+ if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
double maxComparisonValue = getMaxComparisonValue(segment);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
maxComparisonValue));
// Segment might be uploaded directly to the table to replace an old
segment. So update the TTL watermark but
// we can't skip segment even if it's out of TTL as its validDocIds
bitmap is not updated yet.
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader =
createRecordInfoReader(segment)) {
Iterator<RecordInfo> recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
@@ -787,7 +800,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
try {
// Skip removing the upsert metadata of segment that is out of metadata
TTL. The expired metadata is removed
// while creating new consuming segment in batches.
- if (isOutOfMetadataTTL(segment)) {
+ if (!_comparisonColumns.isEmpty() && isOutOfMetadataTTL(segment)) {
_logger.info("Skip removing segment: {} because it's out of TTL",
segmentName);
} else {
doRemoveSegment(segment);
@@ -1290,16 +1303,22 @@ public abstract class
BasePartitionUpsertMetadataManager implements PartitionUps
}
/**
- * Returns the ZooKeeper creation time for upsert consistency.
- * This refers to the time set by the controller when creating new consuming
segment.
- * This is used to ensure consistent creation time across replicas for upsert
- * operations.
- * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+ * Returns the ZooKeeper update time for upsert consistency.
+ * For realtime table, this refers to the time set by the controller when
creating new consuming segment.
+ * For offline table, this refers to the segment push time.
+ * This is used to ensure consistent creation time across replicas for
upsert operations.
+ * @return ZK push time or creation time in milliseconds, or Long.MIN_VALUE
if not set
*/
- protected long getAuthoritativeCreationTime(IndexSegment segment) {
+ protected long getAuthoritativeUpdateOrCreationTime(IndexSegment segment) {
SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
if (segmentMetadata instanceof SegmentMetadataImpl) {
SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl)
segmentMetadata;
+ if (_tableType == TableType.OFFLINE) {
+ long zkPushTime = segmentMetadataImpl.getZkPushTime();
+ if (zkPushTime != Long.MIN_VALUE) {
+ return zkPushTime;
+ }
+ }
long zkCreationTime = segmentMetadataImpl.getZkCreationTime();
if (zkCreationTime != Long.MIN_VALUE) {
return zkCreationTime;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 0f268e6168a..fed81eafc92 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
+import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -59,7 +60,13 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
List<String> comparisonColumns = upsertConfig.getComparisonColumns();
if (comparisonColumns == null) {
- comparisonColumns =
List.of(tableConfig.getValidationConfig().getTimeColumnName());
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumnName != null) {
+ comparisonColumns = List.of(timeColumnName);
+ } else {
+ // No comparison column and no time column: use segment creation time
for comparison
+ comparisonColumns = Collections.emptyList();
+ }
}
PartialUpsertHandler partialUpsertHandler = null;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index cd93d99d110..6edc1020150 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -145,8 +145,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
// current value, but the segment has a larger sequence number
(the segment is newer than the current
// segment).
if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
- currentSegmentName, getAuthoritativeCreationTime(segment),
- getAuthoritativeCreationTime(currentSegment)))) {
+ currentSegmentName,
getAuthoritativeUpdateOrCreationTime(segment),
+ getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
if (currentSegment != segment) {
_previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 9699547d3db..6e42097ee9d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -180,8 +180,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
// current value, but the segment has a larger sequence number
(the segment is newer than the current
// segment).
if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
- currentSegmentName, getAuthoritativeCreationTime(segment),
- getAuthoritativeCreationTime(currentSegment)))) {
+ currentSegmentName,
getAuthoritativeUpdateOrCreationTime(segment),
+ getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index d576e325979..6ef76c8a66d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -352,7 +352,7 @@ public class UpsertContext {
Preconditions.checkState(_schema != null, "Schema must be set");
Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns),
"Primary key columns must be set");
Preconditions.checkState(_hashFunction != null, "Hash function must be
set");
- Preconditions.checkState(CollectionUtils.isNotEmpty(_comparisonColumns),
"Comparison columns must be set");
+ Preconditions.checkState(_comparisonColumns != null, "Comparison columns
must be set");
Preconditions.checkState(_consistencyMode != null, "Consistency mode
must be set");
if (_tableIndexDir == null) {
Preconditions.checkState(_tableDataManager != null, "Either table data
manager or table index dir must be set");
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 42c20f8aadb..bedea992fd6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -185,6 +185,21 @@ public class UpsertUtils {
}
}
+ /**
+ * Constructor that uses a constant comparison value for all records.
+ * Used when no comparison columns are configured and segment creation
time is used as the comparison value.
+ */
+ public RecordInfoReader(IndexSegment segment, List<String>
primaryKeyColumns,
+ Comparable constantComparisonValue, @Nullable String
deleteRecordColumn) {
+ _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+ _comparisonColumnReader = new
ConstantComparisonColumnReader(constantComparisonValue);
+ if (deleteRecordColumn != null) {
+ _deleteRecordColumnReader = new PinotSegmentColumnReader(segment,
deleteRecordColumn);
+ } else {
+ _deleteRecordColumnReader = null;
+ }
+ }
+
public RecordInfo getRecordInfo(int docId) {
PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
Comparable comparisonValue =
_comparisonColumnReader.getComparisonValue(docId);
@@ -266,4 +281,25 @@ public class UpsertUtils {
}
}
}
+
+ /**
+ * A comparison column reader that returns a constant value for all records.
+ * Used when no comparison columns are configured and segment creation time
is used as the comparison value.
+ */
+ public static class ConstantComparisonColumnReader implements
ComparisonColumnReader {
+ private final Comparable _constantValue;
+
+ public ConstantComparisonColumnReader(Comparable constantValue) {
+ _constantValue = constantValue;
+ }
+
+ @Override
+ public Comparable getComparisonValue(int docId) {
+ return _constantValue;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 0947c5f6938..fefbf2efa96 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -766,7 +766,7 @@ public final class TableConfigUtils {
/**
* Validates the upsert-related configurations
- * - check table type is realtime
+ * - check table type supports the configured mode
* - the primary key exists on the schema
* - strict replica-group is configured for routing type
* - consumer type must be low-level
@@ -786,9 +786,19 @@ public final class TableConfigUtils {
// check both upsert and dedup are not enabled simultaneously
Preconditions.checkState(!(isUpsertEnabled && isDedupEnabled),
"A table can have either Upsert or Dedup enabled, but not both");
- // check table type is realtime
- Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
- "Upsert/Dedup table is for realtime table only.");
+ if (tableConfig.getTableType() == TableType.OFFLINE) {
+ Preconditions.checkState(isUpsertEnabled && !isDedupEnabled,
+ "Dedup is not supported for OFFLINE table. Only upsert is supported
for OFFLINE table");
+ // Offline upsert tables require segment partition config so that
segments are assigned to servers
+ // based on partition, ensuring all segments of a partition land on the
same server for correct dedup.
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ SegmentPartitionConfig segmentPartitionConfig =
+ indexingConfig != null ? indexingConfig.getSegmentPartitionConfig()
: null;
+ Preconditions.checkState(
+ segmentPartitionConfig != null &&
MapUtils.isNotEmpty(segmentPartitionConfig.getColumnPartitionMap()),
+ "Offline upsert table must have segment partition config to ensure
correct partition-based "
+ + "segment assignment. Configure segmentPartitionConfig in the
indexingConfig.");
+ }
// primary key exists
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
"Upsert/Dedup table must have primary key columns in the schema");
@@ -804,10 +814,12 @@ public final class TableConfigUtils {
Preconditions.checkState(
tableConfig.getRoutingConfig() != null &&
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
"Upsert/Dedup table must use strict replica-group (i.e.
strictReplicaGroup) based routing");
-
Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig()
== null || (
-
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() ==
null
- &&
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() ==
null),
- "Invalid tenant tag override used for Upsert/Dedup table");
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+
Preconditions.checkState(tableConfig.getTenantConfig().getTagOverrideConfig()
== null || (
+
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming() ==
null
+ &&
tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted() ==
null),
+ "Invalid tenant tag override used for Upsert/Dedup table");
+ }
// specifically for upsert
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -907,10 +919,12 @@ public final class TableConfigUtils {
}
}
- Preconditions.checkState(
- tableConfig.getInstanceAssignmentConfigMap() == null ||
!tableConfig.getInstanceAssignmentConfigMap()
- .containsKey(InstancePartitionsType.COMPLETED.name()),
- "COMPLETED instance partitions can't be configured for upsert / dedup
tables");
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ Preconditions.checkState(
+ tableConfig.getInstanceAssignmentConfigMap() == null ||
!tableConfig.getInstanceAssignmentConfigMap()
+ .containsKey(InstancePartitionsType.COMPLETED.name()),
+ "COMPLETED instance partitions can't be configured for upsert /
dedup tables");
+ }
validateAggregateMetricsForUpsertConfig(tableConfig);
validateTTLForUpsertConfig(tableConfig, schema);
validateTTLForDedupConfig(tableConfig, schema);
@@ -949,6 +963,8 @@ public final class TableConfigUtils {
comparisonColumn, comparisonColumnDataType);
} else {
String comparisonColumn =
tableConfig.getValidationConfig().getTimeColumnName();
+ Preconditions.checkState(comparisonColumn != null,
+ "MetadataTTL / DeletedKeysTTL requires either a comparison column or
a time column to be configured");
DataType comparisonColumnDataType =
schema.getFieldSpecFor(comparisonColumn).getDataType();
Preconditions.checkState(isValidTimeComparisonType(comparisonColumnDataType),
"MetadataTTL / DeletedKeysTTL must have time column: %s in numeric
type, found: %s",
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 0194c45633c..4bcb84bed09 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1904,6 +1904,7 @@ public class TableConfigUtilsTest {
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.build();
+ // OFFLINE table should fail because dedup is not yet supported for
offline tables
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setTimeColumnName(TIME_COLUMN)
.setDedupConfig(new DedupConfig())
@@ -1912,7 +1913,8 @@ public class TableConfigUtilsTest {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
fail();
} catch (IllegalStateException e) {
- assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime table
only.");
+ assertEquals(e.getMessage(), "Dedup is not supported for OFFLINE table.
Only upsert is supported for OFFLINE"
+ + " table");
}
tableConfig =
@@ -2108,6 +2110,7 @@ public class TableConfigUtilsTest {
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.build();
+ // OFFLINE table without segment partition config should fail with
partition config error
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setUpsertConfig(upsertConfig)
@@ -2117,7 +2120,9 @@ public class TableConfigUtilsTest {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
fail();
} catch (IllegalStateException e) {
- assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime table
only.");
+ assertEquals(e.getMessage(),
+ "Offline upsert table must have segment partition config to ensure
correct partition-based "
+ + "segment assignment. Configure segmentPartitionConfig in the
indexingConfig.");
}
tableConfig =
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 1f3a0c9a52e..8a8394c7f32 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -81,6 +81,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private long _dataCrc = Long.MIN_VALUE;
private long _creationTime = Long.MIN_VALUE;
private long _zkCreationTime = Long.MIN_VALUE; // ZooKeeper creation time
for upsert consistency
+ private long _zkPushTime = Long.MIN_VALUE; // ZooKeeper push time for upsert
consistency
private String _timeColumn;
private TimeUnit _timeUnit;
private Duration _timeGranularity;
@@ -412,9 +413,10 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
/**
* Returns the ZooKeeper creation time for upsert consistency.
- * This refers to the time set by controller while creating the consuming
segment. It is used to ensure consistent
- * creation time across replicas for upsert operations.
- * @return ZK creation time in milliseconds, or Long.MIN_VALUE if not set
+ * For REALTIME tables, this is set by the controller when the consuming
segment is created, ensuring consistent
+ * creation time across replicas. For segments loaded from disk, this
returns {@code Long.MIN_VALUE} until
+ * {@link #setZkCreationTime(long)} is explicitly called (e.g. from ZK
metadata during segment loading).
+ * @return ZK creation time in milliseconds, or {@code Long.MIN_VALUE} if
not explicitly set
*/
public long getZkCreationTime() {
return _zkCreationTime;
@@ -428,6 +430,24 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
_zkCreationTime = zkCreationTime;
}
+ /**
+ * Returns the ZooKeeper push time for upsert consistency.
+ * This refers to the time set by controller while pushing the segment. It
is used to ensure consistent
+ * push time across replicas for upsert operations.
+ * @return ZK push time in milliseconds, or Long.MIN_VALUE if not set
+ */
+ public long getZkPushTime() {
+ return _zkPushTime;
+ }
+
+ /**
+ * Sets the ZooKeeper push time for upsert consistency.
+ * @param zkPushTime ZK push time in milliseconds
+ */
+ public void setZkPushTime(long zkPushTime) {
+ _zkPushTime = zkPushTime;
+ }
+
@Override
public long getLastIndexedTimestamp() {
return Long.MIN_VALUE;
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
index e45881a5e88..0914638fa71 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
@@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,14 +59,9 @@ public class PrimaryKeyCount {
LOGGER.warn("TableDataManager for table: {} is null, skipping",
tableNameWithType);
continue;
}
- if (tableDataManager instanceof RealtimeTableDataManager) {
- Map<Integer, Long> partitionToPrimaryKeyCount =
- ((RealtimeTableDataManager)
tableDataManager).getPartitionToPrimaryKeyCount();
-
- if (!partitionToPrimaryKeyCount.isEmpty()) {
- tablesWithPrimaryKeys.add(tableNameWithType);
- }
-
+ Map<Integer, Long> partitionToPrimaryKeyCount =
tableDataManager.getPartitionToPrimaryKeyCount();
+ if (!partitionToPrimaryKeyCount.isEmpty()) {
+ tablesWithPrimaryKeys.add(tableNameWithType);
for (Long numPrimaryKeys : partitionToPrimaryKeyCount.values()) {
totalPrimaryKeyCount += numPrimaryKeys == null ? 0 : numPrimaryKeys;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 85a842d54e7..aa6d7f8526c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -296,12 +296,8 @@ public class TablesResource {
}
}
- // fetch partition to primary key count for realtime tables that have
upsert or dedup enabled
- Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>();
- if (tableDataManager instanceof RealtimeTableDataManager) {
- RealtimeTableDataManager realtimeTableDataManager =
(RealtimeTableDataManager) tableDataManager;
- partitionToPrimaryKeyCountMap =
realtimeTableDataManager.getPartitionToPrimaryKeyCount();
- }
+ // fetch partition to primary key count for tables that have upsert or
dedup enabled
+ Map<Integer, Long> partitionToPrimaryKeyCountMap =
tableDataManager.getPartitionToPrimaryKeyCount();
// construct partitionToServerPrimaryKeyCountMap to populate in
TableMetadataInfo
Map<Integer, Map<String, Long>> partitionToServerPrimaryKeyCountMap = new
HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]