This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 079206a1bcb Use a single map to store and update the previous Record
location keys when reverting Upsert Metadata (#17503)
079206a1bcb is described below
commit 079206a1bcb94c20ff6336bf9ab96d73f5dde1c4
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Mar 13 00:59:11 2026 -0700
Use a single map to store and update the previous Record location keys when
reverting Upsert Metadata (#17503)
* Update AsyncInstanceTable.tsx
* #15162 - Fix Segments going into ERROR state for replicas
* Modify and use a single map instead of 2 maps when reverting the metadata
* Add a check to check for consuming segment
* Simplify the code
* Fix test failure
* Make removal of entry.getKey() in _previousKeyToRecordLocationMap
consistent
* Consistent code
* Modify comments
* Fix review comments and make the conditions much simpler to understand
* Make the removal and revert to be in the same method
* Remove eraseKeyToPreviousLocationMap
* Review comments and code changes to add in three modes while reverting
and reloading
* Remove unused method
* Change the method name
* Keep the boolean for backward compatibility
* Change the log
* Spotless fixes
* Check for conditions while doing reload and force commit
* Check for inconsistent configs
* Remove the unused code
* Change the variable names
* Change the variable and server config name to something meaningful
* Change the path
* Remove warn logs to not flood with messages
* change the default config name
* should revert on metadata inconsistencies
* Make the constants deprecated instead of changing the names
* Checkstyle fixes
* Edit the comments
* Change the comment
* Remove comments and update the condition to add the keys when current
location is in Mutable Segment
* Change the comment
* Extract to a single method instead of having the same logic in multiple
classes
* Have a proper OR condition
* Add in logs when there is an inconsistencies for any mode
* Simplify the method
* Change the conditions
* Change the record locations
* Modify the comments
* Change the conditions
* Change the configs
* checkstyle changes
* checkstyle changes
* fix the tests
* Condition changes
* Revert to the prev segment locations
* Address review comments
* checkstyle fixes
* Spotless fixes
* Update the comment
* Change the tests and add in condition
* change the class name
* Compilation failures
* Change the comment
* Add in more conditions to test out the current segment's location
* Revert operation on tables
* Simplify the revert process
* Make test changes
* Push changes related to tests
* Change the constants
* Change the constants
* Fix the checkstyle exceptions
* Revert back the unchanged code
* Change the method name
* Change the variable names
* Remove the changes that's not necessary
* Change the method name
* Change the variable name
* Change the code to add in case when keys exist in wrong segment
* Resolve conflicts
* Resolve conflict
* Check the table type for inconsistencies
* Add in protected mode test
* Add in overriden method
* Commit github comment update
Co-authored-by: Copilot <[email protected]>
* Update the github comments on hashing the pk
* Fix the checkstyle exceptions
* Change the HashedPrimaryKey to pk
* Add in clear the map even when inconsistencies exist during PROTECTED mode
* Checkstyle fixes and test failures
* Change the test to test the reload in PROTECTED mode:
PartialUpsertTableRebalanceIntegrationTest
* spotless fixes
---------
Co-authored-by: Xiaotian (Jackie) Jiang
<[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.../pinot/controller/BaseControllerStarter.java | 7 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 25 ++-
.../core/data/manager/BaseTableDataManager.java | 31 +--
.../realtime/RealtimeSegmentDataManager.java | 6 +-
.../realtime/UpsertInconsistentStateConfig.java | 102 ---------
.../tests/CommitTimeCompactionIntegrationTest.java | 54 +++--
...PartialUpsertTableRebalanceIntegrationTest.java | 40 ++--
.../models/DummyTableUpsertMetadataManager.java | 20 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 162 ++++++---------
...oncurrentMapPartitionUpsertMetadataManager.java | 187 ++++++++++-------
...nUpsertMetadataManagerForConsistentDeletes.java | 228 ++++++++++++++-------
.../pinot/segment/local/upsert/UpsertContext.java | 10 +
.../pinot/segment/local/upsert/UpsertUtils.java | 42 +++-
.../segment/local/utils/TableConfigUtils.java | 11 +-
.../BasePartitionUpsertMetadataManagerTest.java | 37 ++--
...rrentMapPartitionUpsertMetadataManagerTest.java | 76 +++++--
.../server/starter/helix/BaseServerStarter.java | 7 +-
.../starter/helix/HelixInstanceDataManager.java | 35 ++--
.../apache/pinot/spi/utils/CommonConstants.java | 17 +-
.../ConsumingSegmentConsistencyModeListener.java | 134 ++++++++++++
20 files changed, 757 insertions(+), 474 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 31ed430eeec..14ae16a0bbd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -135,7 +135,6 @@ import
org.apache.pinot.controller.validation.ResourceUtilizationChecker;
import org.apache.pinot.controller.validation.ResourceUtilizationManager;
import org.apache.pinot.controller.validation.StorageQuotaChecker;
import org.apache.pinot.controller.validation.UtilizationChecker;
-import
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
import org.apache.pinot.core.instance.context.ControllerContext;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -159,6 +158,7 @@ import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.PinotMd5Mode;
@@ -768,8 +768,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
-
_clusterConfigChangeHandler.registerClusterConfigChangeListener(UpsertInconsistentStateConfig.getInstance());
- LOGGER.info("Registered UpsertInconsistentStateConfig as cluster config
change listener");
+ _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+ ConsumingSegmentConsistencyModeListener.getInstance());
+ LOGGER.info("Registered ConsumingSegmentConsistencyModeListener as cluster
config change listener");
}
protected PinotLLCRealtimeSegmentManager
createPinotLLCRealtimeSegmentManager() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 113cdac8e5c..7913e88c07c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -105,8 +105,8 @@ import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionSt
import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
-import
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -134,6 +134,7 @@ import
org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -2788,21 +2789,25 @@ public class PinotLLCRealtimeSegmentManager {
* Validates that force commit is allowed for the given table.
* Throws IllegalStateException if force commit is disabled for
partial-upsert tables
* or upsert tables with dropOutOfOrder enabled when replication > 1.
+ * Force commit is always allowed for tables without inconsistent state
configs.
*/
private void validateForceCommitAllowed(String tableNameWithType) {
TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new IllegalStateException("Table config not found for table: " +
tableNameWithType);
}
- UpsertInconsistentStateConfig configInstance =
UpsertInconsistentStateConfig.getInstance();
- if (!configInstance.isForceCommitReloadAllowed(tableConfig)) {
- throw new IllegalStateException(
- "Force commit disabled for table: " + tableNameWithType
- + ". Table is configured as partial upsert or
dropOutOfOrderRecord=true with replication > 1, "
- + "which can cause data inconsistency during force commit. "
- + "Current cluster config '" + configInstance.getConfigKey() +
"' is set to: "
- + configInstance.isForceCommitReloadEnabled()
- + ". To enable force commit, set this config to 'true'.");
+ // Only restrict force commit for tables with inconsistent state configs
+ // (partial upsert or dropOutOfOrder tables with replication > 1)
+ boolean isInconsistentMetadataDuringConsumption =
+ TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+ ConsumingSegmentConsistencyModeListener configInstance =
ConsumingSegmentConsistencyModeListener.getInstance();
+ if (!configInstance.isForceCommitAllowed() &&
isInconsistentMetadataDuringConsumption) {
+ throw new IllegalStateException("Force commit disabled for table: " +
tableNameWithType
+ + ". Table is configured as partial upsert or
dropOutOfOrderRecord=true with replication > 1, "
+ + "which can cause data inconsistency during force commit. " +
"Current cluster config '"
+ + configInstance.getConfigKey() + "' is set to: " +
configInstance.getConsistencyMode()
+ + ". To enable safer force commit, set cluster config '" +
configInstance.getConfigKey()
+ + "' to 'PROTECTED'.");
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 07dd67d376a..815eb5bdb55 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -67,7 +67,6 @@ import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.StaleSegment;
@@ -87,6 +86,7 @@ import
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -115,6 +115,7 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -957,21 +958,25 @@ public abstract class BaseTableDataManager implements
TableDataManager {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// Use force commit to reload consuming segment
if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
- // Force-committing consuming segments is enabled by default.
- // For partial-upsert tables or upserts with out-of-order events
enabled (notably when replication > 1),
- // winner selection could incorrectly favor replicas with fewer
consumed rows.
- // This triggered unnecessary reconsumption and resulted in
inconsistent upsert state.
- // The new fix restores and correct segment metadata after
inconsistencies are noticed.
- // To toggle, existing Force commit behavior dynamically use the
cluster config
- // `pinot.server.upsert.force.commit.reload` without restarting
servers.
+ // Force-committing consuming segments is restricted only for tables
with inconsistent state configs
+ // (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
+ // For these tables, winner selection could incorrectly favor replicas
with fewer consumed rows,
+ // triggering unnecessary reconsumption and resulting in inconsistent
upsert state.
+ // To enable force commit for such tables, change the cluster config
+ // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for
safer reload.
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
- UpsertInconsistentStateConfig config =
UpsertInconsistentStateConfig.getInstance();
- if (tableConfig != null &&
config.isForceCommitReloadAllowed(tableConfig)) {
+ ConsumingSegmentConsistencyModeListener config =
ConsumingSegmentConsistencyModeListener.getInstance();
+ boolean isTableTypeInconsistentDuringConsumption =
+
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+ // Allow force commit if:
+ // 1. Table doesn't have inconsistent configs (non-upsert or standard
upsert tables), OR
+ // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed =
true)
+ if (tableConfig == null || (isTableTypeInconsistentDuringConsumption
&& !config.isForceCommitAllowed())) {
+ _logger.warn("Skipping reload (force commit) on consuming segment:
{} due to inconsistent state config. "
+ + "Change the cluster config: {} to `PROTECTED` for safer
commit", segmentName, config.getConfigKey());
+ } else {
_logger.info("Reloading (force committing) consuming segment: {}",
segmentName);
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
- } else {
- _logger.warn("Skipping reload (force commit) on consuming segment:
{} due to inconsistent state config. "
- + "Control via cluster config: {}", segmentName,
config.getConfigKey());
}
} else {
_logger.warn("Skip reloading consuming segment: {} as configured",
segmentName);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d555c13403b..37c9511d2bb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -83,7 +83,6 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy;
import org.apache.pinot.spi.data.Schema;
@@ -1882,9 +1881,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// TODO: Revisit the non-pauseless handling
if (_partitionUpsertMetadataManager != null) {
UpsertContext upsertContext =
_partitionUpsertMetadataManager.getContext();
- if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit() || (
- upsertContext.getUpsertMode() != UpsertConfig.Mode.PARTIAL &&
!upsertContext.isDropOutOfOrderRecord()
- && upsertContext.getOutOfOrderRecordColumn() == null)) {
+ if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit()
+ || !upsertContext.isTableTypeInconsistentDuringConsumption()) {
return ParallelSegmentConsumptionPolicy.ALLOW_ALWAYS;
}
return pauseless ?
ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
deleted file mode 100644
index 18953f42e32..00000000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.config.table.TableConfig;
-import
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Singleton class to manage the configuration for force commit and reload on
consuming segments
- * for upsert tables with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
- * with consistency mode NONE and replication > 1).
- *
- * This configuration is dynamically updatable via ZK cluster config without
requiring a server restart.
- */
-public class UpsertInconsistentStateConfig implements
PinotClusterConfigChangeListener {
- private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
- private static final UpsertInconsistentStateConfig INSTANCE = new
UpsertInconsistentStateConfig();
-
- private final AtomicBoolean _forceCommitReloadEnabled =
- new
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
-
- private UpsertInconsistentStateConfig() {
- }
-
- public static UpsertInconsistentStateConfig getInstance() {
- return INSTANCE;
- }
-
- /**
- * Checks if force commit/reload is allowed for the given table config.
- *
- * @param tableConfig the table config to check, may be null
- * @return true if force commit/reload is allowed (either globally enabled
or table has no inconsistent configs)
- */
- public boolean isForceCommitReloadAllowed(@Nullable TableConfig tableConfig)
{
- if (tableConfig == null) {
- return false;
- }
- if (_forceCommitReloadEnabled.get()) {
- return true;
- }
- // Allow if table doesn't have inconsistent state configs
- return !TableConfigUtils.checkForInconsistentStateConfigs(tableConfig);
- }
-
- /**
- * Returns whether force commit/reload is currently enabled globally.
- */
- public boolean isForceCommitReloadEnabled() {
- return _forceCommitReloadEnabled.get();
- }
-
- /**
- * Returns the current config key used for this setting.
- */
- public String getConfigKey() {
- return ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG;
- }
-
- @Override
- public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
- if
(!changedConfigs.contains(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG))
{
- return;
- }
-
- String configValue =
clusterConfigs.get(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG);
- boolean forceCommitReloadAllowed = (configValue == null)
- ? ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD
- : Boolean.parseBoolean(configValue);
-
- boolean previousValue =
_forceCommitReloadEnabled.getAndSet(forceCommitReloadAllowed);
- if (previousValue != forceCommitReloadAllowed) {
- LOGGER.info("Updated cluster config: {} from {} to {}",
- ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG,
previousValue, forceCommitReloadAllowed);
- }
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
index a8f219a3819..20ac0f8381d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
@@ -33,6 +33,8 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -533,22 +535,29 @@ public class CommitTimeCompactionIntegrationTest extends
BaseClusterIntegrationT
validatePreCommitState(tableNameWithoutCompaction, tableNameWithCompaction,
tableNameWithCompactionColumnMajor, 3);
- // Perform commit and wait for completion
- performCommitAndWait(tableNameWithoutCompaction, tableNameWithCompaction,
- tableNameWithCompactionColumnMajor, 20_000L, 4, 2);
+ try {
+
setConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED);
- // Brief wait to ensure all commit operations are complete
- waitForAllDocsLoaded(tableNameWithCompaction, 60_000L, 3);
- waitForAllDocsLoaded(tableNameWithCompactionColumnMajor, 60_000L, 3);
+ // Perform commit and wait for completion
+ performCommitAndWait(tableNameWithoutCompaction, tableNameWithCompaction,
+ tableNameWithCompactionColumnMajor, 20_000L, 4, 2);
- // Validate post-commit compaction effectiveness and data integrity
(expecting 3 records, min 2 removed)
- validatePostCommitCompaction(tableNameWithoutCompaction,
tableNameWithCompaction,
- tableNameWithCompactionColumnMajor, 3, 2, 0.95);
+ // Brief wait to ensure all commit operations are complete
+ waitForAllDocsLoaded(tableNameWithCompaction, 60_000L, 3);
+ waitForAllDocsLoaded(tableNameWithCompactionColumnMajor, 60_000L, 3);
- // Clean up
- cleanupTablesAndSchemas(
- List.of(tableNameWithoutCompaction, tableNameWithCompaction,
tableNameWithCompactionColumnMajor),
- List.of(tableNameWithoutCompaction, tableNameWithCompaction,
tableNameWithCompactionColumnMajor));
+ // Validate post-commit compaction effectiveness and data integrity
(expecting 3 records, min 2 removed)
+ validatePostCommitCompaction(tableNameWithoutCompaction,
tableNameWithCompaction,
+ tableNameWithCompactionColumnMajor, 3, 2, 0.95);
+ } finally {
+ try {
+ resetConsumingSegmentConsistencyMode();
+ } finally {
+ cleanupTablesAndSchemas(
+ List.of(tableNameWithoutCompaction, tableNameWithCompaction,
tableNameWithCompactionColumnMajor),
+ List.of(tableNameWithoutCompaction, tableNameWithCompaction,
tableNameWithCompactionColumnMajor));
+ }
+ }
}
@Test
@@ -1222,6 +1231,25 @@ public class CommitTimeCompactionIntegrationTest extends
BaseClusterIntegrationT
});
}
+ private void
setConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode
mode)
+ throws Exception {
+
updateClusterConfig(Map.of(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE,
mode.name()));
+ waitForConsumingSegmentConsistencyMode(mode);
+ }
+
+ private void resetConsumingSegmentConsistencyMode()
+ throws Exception {
+
deleteClusterConfig(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+ waitForConsumingSegmentConsistencyMode(
+
ConsumingSegmentConsistencyModeListener.Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+ }
+
+ private void
waitForConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode
expectedMode) {
+ TestUtils.waitForCondition(
+ aVoid ->
ConsumingSegmentConsistencyModeListener.getInstance().getConsistencyMode() ==
expectedMode, 10_000L,
+ "Timed out waiting for consuming segment consistency mode to become: "
+ expectedMode);
+ }
+
@Test
public void testCommitTimeCompactionPreservesDeletedRecords()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index e3bbde37258..ae2fc011285 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
@@ -232,20 +233,31 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
pushAvroIntoKafka(_avroFiles);
waitForAllDocsLoaded(600_000L, 300);
- String statusResponse = reloadRealtimeTable(getTableName());
- Map<String, String> statusResponseJson =
- JsonUtils.stringToObject(statusResponse, new TypeReference<Map<String,
String>>() {
- });
- String reloadResponse = statusResponseJson.get("status");
- int jsonStartIndex = reloadResponse.indexOf("{");
- String trimmedResponse = reloadResponse.substring(jsonStartIndex);
- Map<String, Map<String, String>> reloadStatus =
- JsonUtils.stringToObject(trimmedResponse, new
TypeReference<Map<String, Map<String, String>>>() {
- });
- String reloadJobId =
reloadStatus.get(REALTIME_TABLE_NAME).get("reloadJobId");
- waitForReloadToComplete(reloadJobId, 600_000L);
- waitForAllDocsLoaded(600_000L, 300);
- verifyIdealState(4, NUM_SERVERS); // 4 because reload triggers commit of
consuming segments
+ // Partial-upsert tables need PROTECTED consuming segment consistency mode
for reload to force-commit
+ // consuming segments; RESTRICTED (default) skips force-commit and reload
never completes.
+ try {
+ updateClusterConfig(
+
Map.of(CommonConstants.ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE,
+ ConsumingSegmentConsistencyModeListener.Mode.PROTECTED.name()));
+ Thread.sleep(5000); // Allow server to pick up cluster config from ZK
+
+ String statusResponse = reloadRealtimeTable(getTableName());
+ Map<String, String> statusResponseJson =
+ JsonUtils.stringToObject(statusResponse, new
TypeReference<Map<String, String>>() {
+ });
+ String reloadResponse = statusResponseJson.get("status");
+ int jsonStartIndex = reloadResponse.indexOf("{");
+ String trimmedResponse = reloadResponse.substring(jsonStartIndex);
+ Map<String, Map<String, String>> reloadStatus =
+ JsonUtils.stringToObject(trimmedResponse, new
TypeReference<Map<String, Map<String, String>>>() {
+ });
+ String reloadJobId =
reloadStatus.get(REALTIME_TABLE_NAME).get("reloadJobId");
+ waitForReloadToComplete(reloadJobId, 600_000L);
+ waitForAllDocsLoaded(600_000L, 300);
+ verifyIdealState(4, NUM_SERVERS); // 4 because reload triggers commit of
consuming segments
+ } finally {
+
deleteClusterConfig(CommonConstants.ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+ }
}
@AfterMethod
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index b7c9b733ef4..6482612be90 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -78,10 +79,6 @@ public class DummyTableUpsertMetadataManager extends
BaseTableUpsertMetadataMana
return false;
}
- @Override
- protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
- }
-
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
}
@@ -91,17 +88,26 @@ public class DummyTableUpsertMetadataManager extends
BaseTableUpsertMetadataMana
return null;
}
+ @Override
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+ }
+
+ @Override
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ }
+
@Override
protected void doRemoveExpiredPrimaryKeys() {
}
@Override
- protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
- ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ protected int getPrevKeyToRecordLocationSize() {
+ return 0;
}
@Override
- protected void eraseKeyToPreviousLocationMap() {
+ protected void clearPrevKeyToRecordLocation() {
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index aa43d89f819..1c9975da3f9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -49,7 +49,6 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
-import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
@@ -67,6 +66,7 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -293,7 +293,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doAddSegment((ImmutableSegmentImpl) segment);
- eraseKeyToPreviousLocationMap();
_trackedSegments.add(segment);
if (_enableSnapshot) {
_updatedSegmentsSinceLastSnapshot.add(segment);
@@ -420,7 +419,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doPreloadSegment((ImmutableSegmentImpl) segment);
- eraseKeyToPreviousLocationMap();
_trackedSegments.add(segment);
_updatedSegmentsSinceLastSnapshot.add(segment);
} finally {
@@ -587,7 +585,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doReplaceSegment(segment, oldSegment);
- eraseKeyToPreviousLocationMap();
if (!(segment instanceof EmptyIndexSegment)) {
_trackedSegments.add(segment);
if (_enableSnapshot) {
@@ -637,11 +634,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
- private static final int MAX_UPSERT_REVERT_RETRIES = 3;
-
/**
- * NOTE: We allow passing in validDocIds and queryableDocIds here so that
the value can be easily accessed from the
- * tests. The passed in bitmaps should always be empty.
+ * NOTE: We allow passing in validDocIds and queryableDocIds here so that
the value can be easily accessed from tests.
+ * The passed-in bitmaps should always be empty.
*/
@VisibleForTesting
public void replaceSegment(ImmutableSegment segment, @Nullable
ThreadSafeMutableRoaringBitmap validDocIds,
@@ -678,108 +673,74 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment, segmentName);
+ if (_context.isTableTypeInconsistentDuringConsumption()) {
+ if (shouldRevertMetadataOnInconsistency(oldSegment)) {
+ // If there are still valid docs in the old segment, validate and
revert the metadata of the
+ // consuming segment in place
+ revertSegmentUpsertMetadata(oldSegment, segmentName,
validDocIdsForOldSegment);
+ return;
+ } else {
+ logInconsistentResults(segmentName,
validDocIdsForOldSegment.getCardinality());
+ }
+ }
removeSegment(oldSegment, validDocIdsForOldSegment);
}
}
- void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
- ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
- MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
- // For partial-upsert tables and upsert tables with
dropOutOfOrderRecord=true or outOfOrderRecordColumn configured,
- // we do not store previous record locations and instead remove all
primary keys that are not replaced. This can
- // lead to inconsistencies across replicas when a consuming segment is
replaced by a committed segment generated
- // on a different server with a different set of records. Such scenarios
can occur when stream consumers do not
- // guarantee the same consumption order, or when a segment with fewer
consumed rows replaces another segment.
- // To prevent these inconsistencies, we persist the previous record
locations so that we can revert to them and
- // restore consistency across replicas.
- if (isConsumingSegmentSeal && (_context.isDropOutOfOrderRecord() ||
_context.getOutOfOrderRecordColumn() != null)) {
- _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for upsert table with "
- + "dropOutOfOrderRecord or outOfOrderRecordColumn enabled. This
can potentially cause inconsistency "
- + "between replicas. Reverting back metadata changes and
triggering segment replacement.",
- numKeysNotReplaced,
- segmentName);
- revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
- } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
- _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for partial-upsert table. "
- + "This can potentially cause inconsistency between replicas. "
- + "Reverting metadata changes and triggering segment replacement.",
numKeysNotReplaced, segmentName);
- revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
- } else {
- _logger.warn("Found {} primary keys not replaced for the segment: {}.",
numKeysNotReplaced, segmentName);
- }
+ /**
+ * Determines whether metadata should be reverted when inconsistencies are
detected during segment replacement.
+ * This is only applicable when in PROTECTED mode, the old segment is a
mutable segment,
+ * and the table has inconsistent state configurations.
+ *
+ * @param oldSegment the old segment being replaced
+ * @return true if metadata revert should be performed on inconsistency
+ */
+ public boolean shouldRevertMetadataOnInconsistency(IndexSegment oldSegment) {
+ return
ConsumingSegmentConsistencyModeListener.getInstance().getConsistencyMode()
+ .equals(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED) &&
oldSegment instanceof MutableSegment
+ && _context.isTableTypeInconsistentDuringConsumption();
}
/**
- * Reverts segment upsert metadata and retries addOrReplaceSegment with a
maximum retry limit to prevent infinite
- * recursion in case of persistent inconsistencies.
+ * Reverts segment upsert metadata
*/
- void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
- ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
String segmentName) {
- for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES;
retryCount++) {
- revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds,
queryableDocIds);
- MutableRoaringBitmap validDocIdsForOldSegment =
getValidDocIdsForOldSegment(oldSegment);
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn)) {
- Iterator<RecordInfo> latestRecordInfoIterator =
- UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, latestRecordInfoIterator,
- oldSegment, validDocIdsForOldSegment);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Caught exception while replacing segment metadata
during inconsistencies: %s, table: %s",
- segmentName, _tableNameWithType), e);
- }
-
- validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
- if (validDocIdsForOldSegment.isEmpty()) {
- _logger.info("Successfully resolved inconsistency for segment: {}
after {} retry attempt(s)", segmentName,
- retryCount + 1);
- return;
- }
-
- int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
- if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
- _logger.warn("Retry {}/{}: Still found {} primary keys not replaced
for segment: {}. Retrying...",
- retryCount + 1, MAX_UPSERT_REVERT_RETRIES,
numKeysStillNotReplaced, segmentName);
- } else {
- _logger.error("Exhausted all {} retries for segment: {}. Found {}
primary keys still not replaced. "
- + "Proceeding with current state which may cause
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
- segmentName,
- numKeysStillNotReplaced);
- if (_context.isDropOutOfOrderRecord() ||
_context.getOutOfOrderRecordColumn() != null) {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
- numKeysStillNotReplaced);
- } else if (_partialUpsertHandler != null) {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysStillNotReplaced);
- }
- }
+ protected void revertSegmentUpsertMetadata(IndexSegment oldSegment, String
segmentName,
+ MutableRoaringBitmap validDocIdsForOldSegment) {
+ _logger.info("Inconsistencies noticed for the segment: {} across servers,
reverting the metadata to resolve...",
+ segmentName);
+ // Revert the keys in the segment to previous location and remove the
newly added keys
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ if (getPrevKeyToRecordLocationSize() == 0) {
+ _logger.info("Successfully resolved inconsistency for segment: {} across
servers", segmentName);
+ return;
+ }
+ int numKeysStillNotReplaced = getPrevKeyToRecordLocationSize();
+ if (numKeysStillNotReplaced > 0) {
+ logInconsistentResults(segmentName, numKeysStillNotReplaced);
+ // Clear the map when inconsistencies still exist for the consuming
segments
+ clearPrevKeyToRecordLocation();
}
}
- protected abstract void removeNewlyAddedKeys(IndexSegment oldSegment);
-
- protected abstract void eraseKeyToPreviousLocationMap();
-
- protected abstract void revertCurrentSegmentUpsertMetadata(IndexSegment
oldSegment,
- ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds);
+ protected void logInconsistentResults(String segmentName, int
numKeysStillNotReplaced) {
+ _logger.error("Found {} primary keys not replaced for segment: {}. "
+ + "Proceeding with current state which may cause inconsistency. To
correct this behaviour from now, set "
+ + "cluster config:
`pinot.server.consuming.segment.consistency.mode` to `PROTECTED`",
+ numKeysStillNotReplaced, segmentName);
+ if (_context.isDropOutOfOrderRecord() ||
_context.getOutOfOrderRecordColumn() != null) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+ numKeysStillNotReplaced);
+ } else if (_partialUpsertHandler != null) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+ numKeysStillNotReplaced);
+ }
+ }
private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment
oldSegment) {
return oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
}
- protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
- try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
- removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Caught exception while removing segment: %s, table:
%s, message: %s", segment.getSegmentName(),
- _tableNameWithType, e.getMessage()), e);
- }
- }
+ protected abstract void removeSegment(IndexSegment segment,
MutableRoaringBitmap validDocIds);
protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
throw new UnsupportedOperationException("Both removeSegment(segment,
validDocID) and "
@@ -825,7 +786,11 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
_logger.info("Removing {} primary keys for segment: {}",
validDocIds.getCardinality(), segmentName);
- removeSegment(segment, validDocIds);
+ if (shouldRevertMetadataOnInconsistency(segment)) {
+ revertSegmentUpsertMetadata(segment, segmentName, validDocIds);
+ } else {
+ removeSegment(segment, validDocIds);
+ }
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
@@ -1142,6 +1107,9 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ protected abstract void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator);
+
/**
* Removes all primary keys that have comparison value smaller than
(largestSeenComparisonValue - TTL).
*/
@@ -1327,4 +1295,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// Fall back to local creation time if ZK creation time is not set
return segmentMetadata.getIndexCreationTime();
}
+
+ protected abstract int getPrevKeyToRecordLocationSize();
+
+ protected abstract void clearPrevKeyToRecordLocation();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 6edc1020150..a6c0721e5fe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -55,8 +56,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
- private final Map<Object, RecordLocation> _newlyAddedKeys = new
ConcurrentHashMap<>();
+ @VisibleForTesting
+ final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType,
int partitionId, UpsertContext context) {
super(tableNameWithType, partitionId, context);
@@ -93,12 +94,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
if (currentSegment == segment) {
if (comparisonResult >= 0) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- RecordLocation newRecordLocation = new
RecordLocation(segment, newDocId, newComparisonValue);
- // Track the record location of the newly added keys
- if (_newlyAddedKeys.containsKey(primaryKey)) {
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- }
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
return currentRecordLocation;
}
@@ -121,8 +117,19 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
validDocIdsForOldSegment.remove(currentDocId);
}
}
+ if (_context.isTableTypeInconsistentDuringConsumption()
+ && currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment
+ && (prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation = new
RecordLocation(segment, newDocId, newComparisonValue);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
return currentRecordLocation;
}
}
@@ -134,8 +141,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
numKeysInWrongSegment.getAndIncrement();
if (comparisonResult >= 0) {
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment
+ && (prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation = new
RecordLocation(segment, newDocId, newComparisonValue);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
return currentRecordLocation;
}
}
@@ -148,19 +165,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
currentSegmentName,
getAuthoritativeUpdateOrCreationTime(segment),
getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
- if (currentSegment != segment) {
- _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
}
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment
+ && (prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation = new
RecordLocation(segment, newDocId, newComparisonValue);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
return currentRecordLocation;
}
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId, newComparisonValue);
}
});
}
@@ -192,6 +214,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
(pk, recordLocation) -> {
if (recordLocation.getSegment() == segment) {
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
segment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ }
return null;
}
return recordLocation;
@@ -199,6 +224,72 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
}
+ @Override
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+ while (primaryKeyIterator.hasNext()) {
+ Map.Entry<Integer, PrimaryKey> primaryKeyEntry =
primaryKeyIterator.next();
+ PrimaryKey primaryKey = primaryKeyEntry.getValue();
+ int docId = primaryKeyEntry.getKey();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ RecordLocation prevLocation =
_previousKeyToRecordLocationMap.get(pk);
+ if (prevLocation == null) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ return null;
+ }
+ if (recordLocation.getSegment() == segment) {
+ // Revert to previous segment location
+ IndexSegment prevSegment = prevLocation.getSegment();
+ ThreadSafeMutableRoaringBitmap prevValidDocIds =
prevSegment.getValidDocIds();
+ if (prevValidDocIds != null) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
+ _primaryKeyColumns, _comparisonColumns,
_deleteRecordColumn)) {
+ int prevDocId = prevLocation.getDocId();
+ RecordInfo recordInfo =
recordInfoReader.getRecordInfo(prevDocId);
+ replaceDocId(prevSegment, prevValidDocIds,
prevSegment.getQueryableDocIds(), segment, docId,
+ prevDocId, recordInfo);
+ _previousKeyToRecordLocationMap.remove(pk);
+ return prevLocation;
+ } catch (IOException e) {
+ _logger.warn("Failed to revert to previous segment: {},
removing key", prevSegment.getSegmentName(),
+ e);
+ _previousKeyToRecordLocationMap.remove(pk);
+ return null;
+ }
+ } else {
+ _previousKeyToRecordLocationMap.remove(pk);
+ return null;
+ }
+ } else if (recordLocation.getSegment() instanceof
ImmutableSegmentImpl) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ } else {
+ _logger.warn(
+ "Consuming segment: {} has already ingested the primary key
for docId {} from segment: {}, suggesting"
+ + " that consumption is occurring concurrently with
segment replacement, which is undesirable "
+ + "for consistency.",
recordLocation.getSegment().getSegmentName(), primaryKeyEntry.getKey(),
+ segment.getSegmentName());
+ }
+ return recordLocation;
+ });
+ }
+ }
+
+ @Override
+ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
+ try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
+ if (shouldRevertMetadataOnInconsistency(segment)) {
+ revertAndRemoveSegment(segment,
UpsertUtils.getRecordIterator(primaryKeyReader, validDocIds));
+ } else {
+ removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while removing segment: %s, table:
%s, message: %s", segment.getSegmentName(),
+ _tableNameWithType, e.getMessage()), e);
+ }
+ }
+
@Override
public void doRemoveExpiredPrimaryKeys() {
AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
@@ -260,57 +351,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
@Override
- protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
- ThreadSafeMutableRoaringBitmap queryableDocIds) {
- // Revert to previous locations present in other segment
- // Replace the valid doc id to that segment location
- _logger.info("Reverting Upsert metadata for {} keys for the segment: {}",
_previousKeyToRecordLocationMap.size(),
- oldSegment.getSegmentName());
- for (Map.Entry<Object, RecordLocation> obj :
_previousKeyToRecordLocationMap.entrySet()) {
- IndexSegment prevSegment = obj.getValue().getSegment();
- RecordLocation currentLocation =
_primaryKeyToRecordLocationMap.get(obj.getKey());
- if (prevSegment != null) {
- if (currentLocation != null && currentLocation.getSegment() ==
oldSegment) {
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
- _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
- int newDocId = obj.getValue().getDocId();
- int currentDocId =
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
- RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
- replaceDocId(prevSegment, prevSegment.getValidDocIds(),
prevSegment.getQueryableDocIds(), oldSegment,
- currentDocId, newDocId, recordInfo);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
- }
- } else {
- _primaryKeyToRecordLocationMap.remove(obj.getKey());
- }
- }
- _logger.info("Reverted Upsert metadata of {} keys to previous segment
locations for the segment: {}",
- _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
- removeNewlyAddedKeys(oldSegment);
- }
-
- @Override
- protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
- // Remove the newly added keys in the metadata map and in the valid doc ids
- int removedKeys = 0;
- for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet())
{
- if (entry.getValue().getSegment() == oldSegment) {
- _primaryKeyToRecordLocationMap.remove(entry.getKey());
- removeDocId(oldSegment, entry.getValue().getDocId());
- removedKeys++;
- }
- }
- _logger.info("Removed newly added {} keys for the segment: {} out of :
{}", removedKeys,
- oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+ protected int getPrevKeyToRecordLocationSize() {
+ return _previousKeyToRecordLocationMap.size();
}
@Override
- protected void eraseKeyToPreviousLocationMap() {
+ protected void clearPrevKeyToRecordLocation() {
_previousKeyToRecordLocationMap.clear();
- _newlyAddedKeys.clear();
}
@Override
@@ -326,7 +373,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
double comparisonValue = ((Number) newComparisonValue).doubleValue();
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v,
comparisonValue));
}
-
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
_hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
@@ -339,19 +385,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
if (segment == currentSegment) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- // Track the record location of the newly added keys
- if (_newlyAddedKeys.containsKey(primaryKey)) {
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- }
} else {
- _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ // For consistent ParallelSegmentConsumptionPolicy like
DISALLOW_ALWAYS and ALLOW_DURING_BUILD_ONLY,
+ // we shouldn't be seeing two mutable segments, so ideally
current segment shouldn't point to
+ // Mutable segment, unless other modes are enabled which could
lead to inconsistent behaviour
+ if (_context.isTableTypeInconsistentDuringConsumption()
+ && !(currentSegment instanceof MutableSegment)) {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ }
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
}
return newRecordLocation;
} else {
- if (segment != currentSegment) {
- _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
- }
// Out-of-order record
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
isOutOfOrderRecord.set(true);
@@ -360,9 +405,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId, newComparisonValue);
}
});
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 6e42097ee9d..7535da9d3e7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -69,13 +69,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<Object,
-
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+ @VisibleForTesting
+ final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
- private final Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
- _newlyAddedKeys = new ConcurrentHashMap<>();
-
// Used to initialize a reference to previous row for merging in partial
upsert
private final LazyRow _reusePreviousRow = new LazyRow();
private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@@ -124,13 +121,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
if (currentSegment == segment) {
if (comparisonResult >= 0) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- RecordLocation newRecordLocation = new
RecordLocation(segment,
- newDocId, newComparisonValue,
currentDistinctSegmentCount);
- // Track the record location of the newly added keys
- if (_newlyAddedKeys.containsKey(primaryKey)) {
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- }
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId,
newComparisonValue, currentDistinctSegmentCount);
} else {
return currentRecordLocation;
}
@@ -153,9 +144,22 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
validDocIdsForOldSegment.remove(currentDocId);
}
}
+ if (_context.isTableTypeInconsistentDuringConsumption()
+ && currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
} else {
+ if (_context.isTableTypeInconsistentDuringConsumption()) {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (currentSegment instanceof MutableSegment &&
(prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation =
+ new RecordLocation(segment, newDocId,
newComparisonValue, currentDistinctSegmentCount);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
+ }
return new RecordLocation(currentSegment, currentDocId,
currentComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
}
@@ -168,9 +172,21 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
numKeysInWrongSegment.getAndIncrement();
if (comparisonResult >= 0) {
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
} else {
+ if (_context.isTableTypeInconsistentDuringConsumption()) {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (currentSegment instanceof MutableSegment &&
(prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation =
+ new RecordLocation(segment, newDocId,
newComparisonValue, currentDistinctSegmentCount);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
+ }
return currentRecordLocation;
}
}
@@ -183,18 +199,28 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
currentSegmentName,
getAuthoritativeUpdateOrCreationTime(segment),
getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
currentSegment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(primaryKey);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
} else {
+ if (_context.isTableTypeInconsistentDuringConsumption()) {
+ RecordLocation prevRecordLocation =
_previousKeyToRecordLocationMap.get(primaryKey);
+ if (currentSegment instanceof MutableSegment &&
(prevRecordLocation == null
+ ||
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+ RecordLocation newRecordLocation =
+ new RecordLocation(segment, newDocId,
newComparisonValue, currentDistinctSegmentCount);
+ _previousKeyToRecordLocationMap.put(primaryKey,
newRecordLocation);
+ }
+ }
return new RecordLocation(currentSegment, currentDocId,
currentComparisonValue,
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
}
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue, 1);
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId, newComparisonValue,
1);
}
});
}
@@ -217,16 +243,21 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
_logger.info("Removing {} segment: {}, current primary key count: {}",
segment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, getNumPrimaryKeys());
long startTimeMs = System.currentTimeMillis();
-
+ // For ConsistentDeletes, we need to iterate over ALL docs in the segment
(not just valid ones)
+ // to properly decrement distinctSegmentCount for every key that was ever
in the segment
try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
- removeSegment(segment,
- UpsertUtils.getPrimaryKeyIterator(primaryKeyReader,
segment.getSegmentMetadata().getTotalDocs()));
+ if (shouldRevertMetadataOnInconsistency(segment)) {
+ revertAndRemoveSegment(segment,
+ UpsertUtils.getRecordIterator(primaryKeyReader,
segment.getSegmentMetadata().getTotalDocs()));
+ } else {
+ removeSegment(segment,
+ UpsertUtils.getPrimaryKeyIterator(primaryKeyReader,
segment.getSegmentMetadata().getTotalDocs()));
+ }
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while removing segment: %s, table:
%s", segment.getSegmentName(),
_tableNameWithType), e);
}
-
// Update metrics
long numPrimaryKeys = getNumPrimaryKeys();
updatePrimaryKeyGauge(numPrimaryKeys);
@@ -234,6 +265,20 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
+ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
+ try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment,
_primaryKeyColumns)) {
+ if (shouldRevertMetadataOnInconsistency(segment)) {
+ revertAndRemoveSegment(segment,
UpsertUtils.getRecordIterator(primaryKeyReader, validDocIds));
+ } else {
+ removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while removing segment: %s, table:
%s, message: %s", segment.getSegmentName(),
+ _tableNameWithType, e.getMessage()), e);
+ }
+ }
+
@Override
public void replaceSegment(ImmutableSegment segment, @Nullable
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable
Iterator<RecordInfo> recordInfoIterator,
@@ -258,8 +303,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
oldSegment, validDocIdsForOldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment,
- segmentName);
+ if (_context.isTableTypeInconsistentDuringConsumption()) {
+ if (shouldRevertMetadataOnInconsistency(oldSegment)) {
+ revertSegmentUpsertMetadata(oldSegment, segmentName,
validDocIdsForOldSegment);
+ return;
+ } else {
+ logInconsistentResults(segmentName,
validDocIdsForOldSegment.getCardinality());
+ }
+ }
}
// we want to always remove a segment in case of
enableDeletedKeysCompactionConsistency = true
// this is to account for the removal of primary-key in the
to-be-removed segment and reduce
@@ -271,32 +322,87 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
}
@Override
- protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
- int removedKeys = 0;
- for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet())
{
- if (entry.getValue().getSegment() == oldSegment) {
- _primaryKeyToRecordLocationMap.remove(entry.getKey());
- removeDocId(oldSegment, entry.getValue().getDocId());
- removedKeys++;
- }
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ // We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
+ // as the occurrence of the key in this segment is being removed. We are
taking a set of unique primary keys
+ // to avoid double counting the same key in the same segment.
+ Set<Object> uniquePrimaryKeys = new HashSet<>();
+ while (primaryKeyIterator.hasNext()) {
+ PrimaryKey primaryKey = primaryKeyIterator.next();
+
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
+ (pk, recordLocation) -> {
+ if (recordLocation.getSegment() == segment) {
+ if (_context.isTableTypeInconsistentDuringConsumption() &&
segment instanceof MutableSegment) {
+ _previousKeyToRecordLocationMap.remove(pk);
+ }
+ return null;
+ }
+ if (!uniquePrimaryKeys.add(pk)) {
+ return recordLocation;
+ }
+ return new RecordLocation(recordLocation.getSegment(),
recordLocation.getDocId(),
+ recordLocation.getComparisonValue(),
+
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+ });
}
- _logger.info("Removed newly added {} keys for the segment: {} out of :
{}", removedKeys,
- oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
}
@Override
- protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
// as the occurrence of the key in this segment is being removed. We are
taking a set of unique primary keys
// to avoid double counting the same key in the same segment.
Set<Object> uniquePrimaryKeys = new HashSet<>();
while (primaryKeyIterator.hasNext()) {
- PrimaryKey primaryKey = primaryKeyIterator.next();
+ Map.Entry<Integer, PrimaryKey> primaryKeyEntry =
primaryKeyIterator.next();
+ PrimaryKey primaryKey = primaryKeyEntry.getValue();
+ int docId = primaryKeyEntry.getKey();
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
_hashFunction),
(pk, recordLocation) -> {
- if (recordLocation.getSegment() == segment) {
+ RecordLocation prevLocation =
_previousKeyToRecordLocationMap.get(pk);
+ if (prevLocation == null) {
+ _previousKeyToRecordLocationMap.remove(pk);
return null;
}
+ if (recordLocation.getSegment() == segment) {
+ // Revert to previous segment location
+ IndexSegment prevSegment = prevLocation.getSegment();
+ ThreadSafeMutableRoaringBitmap prevValidDocIds =
prevSegment.getValidDocIds();
+ if (prevValidDocIds != null) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
+ _primaryKeyColumns, _comparisonColumns,
_deleteRecordColumn)) {
+ int prevDocId = prevLocation.getDocId();
+ RecordInfo recordInfo =
recordInfoReader.getRecordInfo(prevDocId);
+ replaceDocId(prevSegment, prevValidDocIds,
prevSegment.getQueryableDocIds(), segment, docId,
+ prevDocId, recordInfo);
+ _previousKeyToRecordLocationMap.remove(pk);
+ if (!uniquePrimaryKeys.add(pk)) {
+ return prevLocation;
+ }
+ return new RecordLocation(prevLocation.getSegment(),
prevLocation.getDocId(),
+ prevLocation.getComparisonValue(),
+
RecordLocation.decrementSegmentCount(prevLocation.getDistinctSegmentCount()));
+ } catch (IOException e) {
+ _logger.warn("Failed to revert to previous segment: {},
removing key", prevSegment.getSegmentName(),
+ e);
+ _previousKeyToRecordLocationMap.remove(pk);
+ return null;
+ }
+ } else {
+ _previousKeyToRecordLocationMap.remove(pk);
+ return null;
+ }
+ } else if (recordLocation.getSegment() instanceof
ImmutableSegmentImpl) {
+ // The consuming segment's key is in a different immutable
segment
+ _previousKeyToRecordLocationMap.remove(pk);
+ } else {
+ _logger.warn(
+ "Consuming segment {} has already ingested the primary key
for docId {} from segment {}, suggesting"
+ + " that consumption is occurring concurrently with
segment replacement, which is undesirable "
+ + "for consistency.",
recordLocation.getSegment().getSegmentName(), primaryKeyEntry.getKey(),
+ segment.getSegmentName());
+ }
if (!uniquePrimaryKeys.add(pk)) {
return recordLocation;
}
@@ -369,42 +475,13 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
}
@Override
- protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
- ThreadSafeMutableRoaringBitmap queryableDocIds) {
- _logger.info("Reverting Upsert metadata for {} keys",
_previousKeyToRecordLocationMap.size());
- // Revert to previous locations present in other segment
- for (Map.Entry<Object, RecordLocation> obj :
_previousKeyToRecordLocationMap.entrySet()) {
- IndexSegment prevSegment = obj.getValue().getSegment();
- RecordLocation currentLocation =
_primaryKeyToRecordLocationMap.get(obj.getKey());
- if (prevSegment != null) {
- if (currentLocation != null && currentLocation.getSegment() ==
oldSegment) {
- try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
- _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
- int newDocId = obj.getValue().getDocId();
- int currentDocId =
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
- RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
- // Update valid docId to the other segment location
- replaceDocId(prevSegment, prevSegment.getValidDocIds(),
prevSegment.getQueryableDocIds(), oldSegment,
- currentDocId, newDocId, recordInfo);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
- }
- } else {
- _primaryKeyToRecordLocationMap.remove(obj.getKey());
- }
- }
- _logger.info("Reverted Upsert metadata of {} keys to previous segment
locations for the segment: {}",
- _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
- // For the newly added keys into the segment, remove the pk and valid doc
id
- removeNewlyAddedKeys(oldSegment);
+ protected int getPrevKeyToRecordLocationSize() {
+ return _previousKeyToRecordLocationMap.size();
}
@Override
- protected void eraseKeyToPreviousLocationMap() {
+ protected void clearPrevKeyToRecordLocation() {
_previousKeyToRecordLocationMap.clear();
- _newlyAddedKeys.clear();
}
@Override
@@ -432,15 +509,16 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
int currentDocId = currentRecordLocation.getDocId();
if (segment == currentSegment) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue,
+ return new RecordLocation(segment, newDocId,
newComparisonValue,
currentRecordLocation.getDistinctSegmentCount());
- // Track the record location of the newly added keys
- if (_newlyAddedKeys.containsKey(primaryKey)) {
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- }
- return newRecordLocation;
} else {
- _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ // For consistent ParallelSegmentConsumptionPolicy like
DISALLOW_ALWAYS and ALLOW_DURING_BUILD_ONLY,
+ // we shouldn't be seeing two mutable segments, so ideally
current segment shouldn't point to
+ // mutable segment, unless other modes are enabled which could
lead to inconsistent behaviour
+ if (_context.isTableTypeInconsistentDuringConsumption()
+ && !(currentSegment instanceof MutableSegment)) {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ }
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
@@ -461,9 +539,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue, 1);
- _newlyAddedKeys.put(primaryKey, newRecordLocation);
- return newRecordLocation;
+ return new RecordLocation(segment, newDocId, newComparisonValue,
1);
}
});
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 6ef76c8a66d..7bbb4d98143 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -192,6 +192,16 @@ public class UpsertContext {
return _tableIndexDir;
}
+ /**
+ * Returns true if the table configuration has settings that can lead to
inconsistent upsert metadata
+ * during segment replacement after force commit. This happens when:
+ * - Partial upsert is enabled (records need to be merged with previous
values)
+ * - dropOutOfOrderRecord is enabled with NONE consistency mode (records may
have been dropped)
+ */
+ public boolean isTableTypeInconsistentDuringConsumption() {
+ return _dropOutOfOrderRecord || _outOfOrderRecordColumn != null ||
_partialUpsertHandler != null;
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index bedea992fd6..b508a02aeb0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -132,7 +133,6 @@ public class UpsertUtils {
MutableRoaringBitmap validDocIds) {
return new Iterator<>() {
private final PeekableIntIterator _docIdIterator =
validDocIds.getIntIterator();
-
@Override
public boolean hasNext() {
return _docIdIterator.hasNext();
@@ -165,6 +165,46 @@ public class UpsertUtils {
};
}
+ public static Iterator<Map.Entry<Integer, PrimaryKey>>
getRecordIterator(PrimaryKeyReader primaryKeyReader,
+ MutableRoaringBitmap validDocIds) {
+
+ return new Iterator<>() {
+ private final PeekableIntIterator _docIdIterator =
validDocIds.getIntIterator();
+
+ @Override
+ public boolean hasNext() {
+ return _docIdIterator.hasNext();
+ }
+
+ @Override
+ public Map.Entry<Integer, PrimaryKey> next() {
+ int docId = _docIdIterator.next();
+ return Map.entry(docId, primaryKeyReader.getPrimaryKey(docId));
+ }
+ };
+ }
+
+ /**
+ * Returns an iterator of docId and {@link PrimaryKey} for all the documents
from the segment.
+ */
+ public static Iterator<Map.Entry<Integer, PrimaryKey>>
getRecordIterator(PrimaryKeyReader primaryKeyReader,
+ int numDocs) {
+ return new Iterator<>() {
+ private int _docId = 0;
+
+ @Override
+ public boolean hasNext() {
+ return _docId < numDocs;
+ }
+
+ @Override
+ public Map.Entry<Integer, PrimaryKey> next() {
+ int docId = _docId++;
+ return Map.entry(docId, primaryKeyReader.getPrimaryKey(docId));
+ }
+ };
+ }
+
public static class RecordInfoReader implements Closeable {
private final PrimaryKeyReader _primaryKeyReader;
private final ComparisonColumnReader _comparisonColumnReader;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index e8ee6c588e3..80523c83f37 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1795,15 +1795,16 @@ public final class TableConfigUtils {
* @param tableConfig the table config to check, may be null
* @return true if the table has inconsistent state configs, false if
tableConfig is null or no issues found
*/
- public static boolean checkForInconsistentStateConfigs(@Nullable TableConfig
tableConfig) {
+ public static boolean isTableTypeInconsistentDuringConsumption(@Nullable
TableConfig tableConfig) {
+ if (tableConfig == null) {
+ return false;
+ }
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
if (upsertConfig == null) {
return false;
}
- return tableConfig.getReplication() > 1 && (
- upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL
- || (upsertConfig.isDropOutOfOrderRecord()
- && upsertConfig.getConsistencyMode() ==
UpsertConfig.ConsistencyMode.NONE));
+ return (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL ||
upsertConfig.isDropOutOfOrderRecord()
+ || upsertConfig.getOutOfOrderRecordColumn() != null);
}
// enum of all the skip-able validation types.
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index c07d3e41cd1..8435d013978 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -178,24 +178,21 @@ public class BasePartitionUpsertMetadataManagerTest {
List<String> segmentsTakenSnapshot = new ArrayList<>();
File segDir01 = new File(TEMP_DIR, "seg01");
- ImmutableSegmentImpl seg01 =
- createImmutableSegment("seg01", segDir01, segmentsTakenSnapshot, new
ArrayList<>());
+ ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01,
segmentsTakenSnapshot, new ArrayList<>());
seg01.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3), null);
upsertMetadataManager.addSegment(seg01);
// seg01 has a tmp snapshot file, but no snapshot file
FileUtils.touch(new File(segDir01,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
File segDir02 = new File(TEMP_DIR, "seg02");
- ImmutableSegmentImpl seg02 =
- createImmutableSegment("seg02", segDir02, segmentsTakenSnapshot, new
ArrayList<>());
+ ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02,
segmentsTakenSnapshot, new ArrayList<>());
seg02.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3, 4, 5),
null);
upsertMetadataManager.addSegment(seg02);
// seg02 has snapshot file, so its snapshot is taken first.
FileUtils.touch(new File(segDir02,
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
File segDir03 = new File(TEMP_DIR, "seg03");
- ImmutableSegmentImpl seg03 =
- createImmutableSegment("seg03", segDir03, segmentsTakenSnapshot, new
ArrayList<>());
+ ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03,
segmentsTakenSnapshot, new ArrayList<>());
seg03.enableUpsert(upsertMetadataManager, createDocIds(3, 4, 7), null);
upsertMetadataManager.addSegment(seg03);
@@ -369,8 +366,7 @@ public class BasePartitionUpsertMetadataManagerTest {
File segDir01 = new File(TEMP_DIR, "seg01");
ImmutableSegmentImpl seg01 =
- createImmutableSegment("seg01", segDir01, validDocIdsSegmentsTaken,
- queryableDocIdsSegmentsTaken);
+ createImmutableSegment("seg01", segDir01, validDocIdsSegmentsTaken,
queryableDocIdsSegmentsTaken);
ThreadSafeMutableRoaringBitmap queryableDocIds01 = createDocIds(0, 1, 2);
// Some docs excluded due to deletes
seg01.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3),
queryableDocIds01);
upsertMetadataManager.trackSegment(seg01);
@@ -381,8 +377,7 @@ public class BasePartitionUpsertMetadataManagerTest {
File segDir02 = new File(TEMP_DIR, "seg02");
ImmutableSegmentImpl seg02 =
- createImmutableSegment("seg02", segDir02, validDocIdsSegmentsTaken,
- queryableDocIdsSegmentsTaken);
+ createImmutableSegment("seg02", segDir02, validDocIdsSegmentsTaken,
queryableDocIdsSegmentsTaken);
ThreadSafeMutableRoaringBitmap queryableDocIds02 = createDocIds(0, 2, 3,
5); // Some docs excluded due to deletes
seg02.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3, 4, 5),
queryableDocIds02);
upsertMetadataManager.trackSegment(seg02);
@@ -393,8 +388,7 @@ public class BasePartitionUpsertMetadataManagerTest {
File segDir03 = new File(TEMP_DIR, "seg03");
ImmutableSegmentImpl seg03 =
- createImmutableSegment("seg03", segDir03, validDocIdsSegmentsTaken,
- queryableDocIdsSegmentsTaken);
+ createImmutableSegment("seg03", segDir03, validDocIdsSegmentsTaken,
queryableDocIdsSegmentsTaken);
ThreadSafeMutableRoaringBitmap queryableDocIds03 = createDocIds(3, 7); //
Some docs excluded due to deletes
seg03.enableUpsert(upsertMetadataManager, createDocIds(3, 4, 7),
queryableDocIds03);
upsertMetadataManager.trackSegment(seg03);
@@ -1050,10 +1044,6 @@ public class BasePartitionUpsertMetadataManagerTest {
return false;
}
- @Override
- protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
- }
-
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
}
@@ -1063,17 +1053,26 @@ public class BasePartitionUpsertMetadataManagerTest {
return null;
}
+ @Override
+ protected void revertAndRemoveSegment(IndexSegment segment,
+ Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+ }
+
+ @Override
+ protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
+ }
+
@Override
protected void doRemoveExpiredPrimaryKeys() {
}
@Override
- protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
- ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ protected int getPrevKeyToRecordLocationSize() {
+ return 0;
}
@Override
- protected void eraseKeyToPreviousLocationMap() {
+ protected void clearPrevKeyToRecordLocation() {
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 9e3beaf4bbb..681f7d53bc3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -66,6 +66,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -1968,7 +1969,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
}
@Test
- public void testPartialUpsertOldSegmentTriggerReversion() throws IOException
{
+ public void testPartialUpsertOldSegmentTriggerReversion()
+ throws IOException {
// Test partial upserts with consuming (mutable) segment being sealed -
revert should be triggered
// Note: Revert logic only applies when sealing a consuming segment, not
for immutable segment replacement
PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
@@ -1999,8 +2001,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
int[] timestamps2 = new int[]{150, 350};
ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
- ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
- primaryKeysList2, timestamps2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds2, null,
primaryKeysList2, timestamps2);
// Replace mutable with immutable (consuming segment seal) - revert SHOULD
be triggered
upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
@@ -2012,8 +2014,6 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
checkRecordLocation(recordLocationMap, 3, segment2, 1, 350,
HashFunction.NONE);
- // Mutable segment's validDocIds should be 0 after removal
- assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
upsertMetadataManager.stop();
upsertMetadataManager.close();
}
@@ -2148,7 +2148,7 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
// Create second real segment with 2 records (subset of first)
int[] primaryKeys2 = new int[]{10, 30};
- int[] timestamps2 = new int[]{1500, 3500};
+ int[] timestamps2 = new int[]{1600, 3600};
ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = createRealSegment(segmentName,
primaryKeys2, timestamps2, validDocIds2);
@@ -2156,8 +2156,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.replaceSegment(segment2, segment1);
assertEquals(recordLocationMap.size(), 2);
- checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
- checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1600,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 1, 3600,
HashFunction.NONE);
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
upsertMetadataManager.stop();
@@ -2262,6 +2262,62 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ @Test
+ public void testProtectedModeRevertsMetadataForConsumingSegmentSeal()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+ ThreadSafeMutableRoaringBitmap validDocIdsMutable = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIdsMutable, null, mutablePrimaryKeys);
+
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+ trackedSegments.add(mutableSegment);
+
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{10, 20, 30};
+ int[] timestamps = new int[]{900, 1900, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords,
primaryKeys);
+ ImmutableSegmentImpl immutableSegment =
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+ primaryKeysList, timestamps);
+ trackedSegments.add(immutableSegment);
+
+ ConsumingSegmentConsistencyModeListener consistencyModeListener =
+ ConsumingSegmentConsistencyModeListener.getInstance();
+
consistencyModeListener.setMode(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED);
+ try {
+ upsertMetadataManager.replaceSegment(immutableSegment,
validDocIdsImmutable, null,
+ getRecordInfoListWithIntegerComparison(numRecords, primaryKeys,
timestamps, null).iterator(), mutableSegment);
+ } finally {
+ consistencyModeListener.reset();
+ }
+
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 900,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 1900,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, immutableSegment, 2, 3500,
HashFunction.NONE);
+
assertTrue(upsertMetadataManager._previousKeyToRecordLocationMap.isEmpty());
+ assertEquals(validDocIdsImmutable.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
@Test
public void testNoRevertForImmutableSegmentReplacement()
throws IOException {
@@ -2300,12 +2356,8 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
primaryKeysList2, timestamps2);
- long startTime = System.currentTimeMillis();
upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(), segment1);
- long duration = System.currentTimeMillis() - startTime;
-
- assertTrue(duration < 1000, "Immutable-to-immutable replacement should
complete quickly, took: " + duration + "ms");
assertEquals(recordLocationMap.size(), 1);
checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 7010bd16032..f95f7f64d3b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -84,7 +84,6 @@ import
org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
-import
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
import org.apache.pinot.core.instance.context.ServerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.transport.ListenerConfig;
@@ -132,6 +131,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import
org.apache.pinot.spi.utils.CommonConstants.Server.SegmentCompletionProtocol;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.PinotMd5Mode;
@@ -287,8 +287,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
LOGGER.info("Registered ClusterConfigForTable change listener");
// Register configuration change listener for upsert force commit/reload
disable setting
_clusterConfigChangeHandler.registerClusterConfigChangeListener(
- UpsertInconsistentStateConfig.getInstance());
- LOGGER.info("Registered UpsertInconsistentStateConfig change listener for
dynamic force commit/reload control");
+ ConsumingSegmentConsistencyModeListener.getInstance());
+ LOGGER.info(
+ "Registered ConsumingSegmentConsistencyModeListener change listener
for dynamic force commit/reload control");
LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName:
{}, instanceId: {}", _zkAddress,
_helixClusterName, _instanceId);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6dee647e915..9d7464cca97 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -56,13 +56,13 @@ import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
-import
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
@@ -73,6 +73,7 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
@@ -549,26 +550,28 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
tableNameWithType, segmentNames));
TableDataManager tableDataManager =
_tableDataManagerMap.get(tableNameWithType);
if (tableDataManager != null) {
+ // Use cached table config for performance - properties we check
(replication, upsert mode)
+ // rarely change after table creation, and cache is refreshed on reload
+ TableConfig tableConfig =
tableDataManager.getCachedTableConfigAndSchema().getLeft();
+ boolean isTableTypeInconsistentDuringConsumption =
+
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+ ConsumingSegmentConsistencyModeListener config =
ConsumingSegmentConsistencyModeListener.getInstance();
+
+ // Only restrict force commit for tables with inconsistent state configs
+ // (partial upsert or dropOutOfOrderRecord=true with replication > 1)
+ // when mode is DEFAULT (isForceCommitAllowed = false)
+ if (isTableTypeInconsistentDuringConsumption &&
!config.isForceCommitAllowed()) {
+ LOGGER.warn("Force commit disabled for table: {} due to inconsistent
state config. "
+ + "Change the config to `PROTECTED` via cluster config: {}",
tableNameWithType, config.getConfigKey());
+ return;
+ }
+
segmentNames.forEach(segName -> {
SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segName);
if (segmentDataManager != null) {
try {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
- // Force-committing consuming segments is enabled by default.
- // For partial-upsert tables or upserts with out-of-order events
enabled (notably when replication > 1),
- // winner selection could incorrectly favor replicas with fewer
consumed rows.
- // This triggered unnecessary reconsumption and resulted in
inconsistent upsert state.
- // The fix restores correct segment metadata before winner
selection.
- // Force commit behavior can be toggled dynamically using the
cluster config
- // `pinot.server.upsert.force.commit.reload` without restarting
servers.
- TableConfig tableConfig =
tableDataManager.getCachedTableConfigAndSchema().getLeft();
- UpsertInconsistentStateConfig config =
UpsertInconsistentStateConfig.getInstance();
- if (config.isForceCommitReloadAllowed(tableConfig)) {
- ((RealtimeSegmentDataManager)
segmentDataManager).forceCommit();
- } else {
- LOGGER.warn("Force commit disabled for table: {} due to
inconsistent state config. "
- + "Control via cluster config: {}", tableNameWithType,
config.getConfigKey());
- }
+ ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
} finally {
tableDataManager.releaseSegment(segmentDataManager);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 88b9bc30520..d9f1987fd92 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2183,15 +2183,14 @@ public class CommonConstants {
*/
public static class ConfigChangeListenerConstants {
/**
- * Cluster config key to control whether force commit/reload is allowed
for upsert tables
- * with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
- * with consistency mode NONE and replication > 1).
- */
- public static final String FORCE_COMMIT_RELOAD_CONFIG =
"pinot.server.upsert.force.commit.reload";
-
- /**
- * Default value: true (force commit/reload is allowed by default).
+ * Cluster config key to control how to handle inconsistency during
consuming segment commit
+ * for upsert/dedup tables (partial upsert or dropOutOfOrderRecord=true
with consistency mode).
+ *
+ * Supported values:
+ * - RESTRICTED: Force commit is disabled for tables with inconsistent
state table configurations
+ * - PROTECTED: Force commit is enabled with metadata reversion on
inconsistencies
+ * - UNSAFE: Force commit is enabled without metadata reversion (Can lead
to inconsistencies)
*/
- public static final boolean DEFAULT_FORCE_COMMIT_RELOAD = true;
+ public static final String CONSUMING_SEGMENT_CONSISTENCY_MODE =
"pinot.server.consuming.segment.consistency.mode";
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
new file mode 100644
index 00000000000..869754a266c
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton class to manage the configuration for force commit on consuming
segments
+ * for upsert tables with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true or
+ * outOfOrderColumn). By default, the force commit and reload is disabled
+ * This configuration is dynamically updatable via ZK cluster config without
requiring a server restart.
+ */
+public class ConsumingSegmentConsistencyModeListener implements
PinotClusterConfigChangeListener {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumingSegmentConsistencyModeListener.class);
+ private static final ConsumingSegmentConsistencyModeListener INSTANCE = new
ConsumingSegmentConsistencyModeListener();
+
+ public enum Mode {
+ /**
+ * Force commit is disabled for tables with inconsistent state
configurations.
+ * Safe option that prevents potential data inconsistency issues.
+ */
+ RESTRICTED(false),
+
+ /**
+ * Force commit is enabled but tables with partial upsert or
dropOutOfOrderRecord=true (with replication > 1)
+ * will have their upsert metadata reverted when inconsistencies are
detected.
+ */
+ PROTECTED(true),
+
+ /**
+ * Force commit is enabled for all tables regardless of their
configuration.
+ * Use with caution as this may cause data inconsistency for
partial-upsert tables
+ * or upsert tables with dropOutOfOrderRecord/outOfOrderRecordColumn
enabled when replication > 1.
+ * Inconsistency checks and metadata revert are skipped.
+ */
+ UNSAFE(true);
+
+ public static final Mode DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE =
RESTRICTED;
+
+ private final boolean _forceCommitAllowed;
+
+ Mode(boolean forceCommitAllowed) {
+ _forceCommitAllowed = forceCommitAllowed;
+ }
+
+ public boolean isForceCommitAllowed() {
+ return _forceCommitAllowed;
+ }
+
+ public static Mode fromString(String value) {
+ if (value == null || value.trim().isEmpty()) {
+ return DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE;
+ }
+ String normalized = value.trim().toUpperCase();
+ try {
+ return Mode.valueOf(normalized);
+ } catch (IllegalArgumentException e) {
+ return DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE;
+ }
+ }
+ }
+
+ private final AtomicReference<Mode> _consistencyMode =
+ new AtomicReference<>(Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+
+ private ConsumingSegmentConsistencyModeListener() {
+ }
+
+ public static ConsumingSegmentConsistencyModeListener getInstance() {
+ return INSTANCE;
+ }
+
+ public boolean isForceCommitAllowed() {
+ return _consistencyMode.get().isForceCommitAllowed();
+ }
+
+ public Mode getConsistencyMode() {
+ return _consistencyMode.get();
+ }
+
+ public String getConfigKey() {
+ return ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE;
+ }
+
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ if
(!changedConfigs.contains(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE))
{
+ return;
+ }
+
+ String configValue =
clusterConfigs.get(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+ Mode newMode = Mode.fromString(configValue);
+
+ Mode previousMode = _consistencyMode.getAndSet(newMode);
+ if (previousMode != newMode) {
+ LOGGER.info("Updated cluster config: {} from {} to {}",
+ ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE,
previousMode, newMode);
+ }
+ }
+
+ @VisibleForTesting
+ public void setMode(Mode mode) {
+ _consistencyMode.set(mode);
+ }
+
+ @VisibleForTesting
+ public void reset() {
+ _consistencyMode.set(Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]