This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 079206a1bcb Use a single map to store and update the previous Record 
location keys when reverting Upsert Metadata (#17503)
079206a1bcb is described below

commit 079206a1bcb94c20ff6336bf9ab96d73f5dde1c4
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Mar 13 00:59:11 2026 -0700

    Use a single map to store and update the previous Record location keys when 
reverting Upsert Metadata (#17503)
    
    * Update AsyncInstanceTable.tsx
    
    * #15162 - Fix Segments going into ERROR state for replicas
    
    * Modify and use a single map instead of 2 maps when reverting the metadata
    
    * Add a check to check for consuming segment
    
    * Simplify the code
    
    * Fix test failure
    
    * Make removal of entry.getKey() in _previousKeyToRecordLocationMap 
consistent
    
    * Consistent code
    
    * Modify comments
    
    * Fix review comments and make the conditions much simpler to understand
    
    * Make the removal and revert to be in the same method
    
    * Remove eraseKeyToPreviousLocationMap
    
    * Review comments and code changes to add in three modes while reverting 
and reloading
    
    * Remove unused method
    
    * Change the method name
    
    * Keep the boolean for backward compatibility
    
    * Change the log
    
    * Spotless fixes
    
    * Check for conditions while doing reload and force commit
    
    * Check for inconsistent configs
    
    * Remove the unused code
    
    * Change the variable names
    
    * Change the variable and server config name to something meaningful
    
    * Change the path
    
    * Remove warn logs to not flood with messages
    
    * change the default config name
    
    * should revert on metadata inconsistencies
    
    * Make the constants deprecated instead of changing the names
    
    * Checkstyle fixes
    
    * Edit the comments
    
    * Change the comment
    
    * Remove comments and update the condition to add the keys when current 
location is in Mutable Segment
    
    * Change the comment
    
    * Extract to a single method instead of having the same logic in multiple 
classes
    
    * Have a proper OR condition
    
    * Add in logs when there is an inconsistencies for any mode
    
    * Simplify the method
    
    * Change the conditions
    
    * Change the record locations
    
    * Modify the comments
    
    * Change the conditions
    
    * Change the configs
    
    * checkstyle changes
    
    * checkstyle changes
    
    * fix the tests
    
    * Condition changes
    
    * Revert to the prev segment locations
    
    * Address review comments
    
    * checkstyle fixes
    
    * Spotless fixes
    
    * Update the comment
    
    * Change the tests and add in condition
    
    * change the class name
    
    * Compilation failures
    
    * Change the comment
    
    * Add in more conditions to test out the current segment's location
    
    * Revert operation on tables
    
    * Simplify the revert process
    
    * Make test changes
    
    * Push changes related to tests
    
    * Change the constants
    
    * Change the constants
    
    * Fix the checkstyle exceptions
    
    * Revert back the unchanged code
    
    * Change the method name
    
    * Change the variable names
    
    * Remove the changes that's not necessary
    
    * Change the method name
    
    * Change the variable name
    
    * Change the code to add in case when keys exist in wrong segment
    
    * Resolve conflicts
    
    * Resolve conflict
    
    * Check the table type for inconsistencies
    
    * Add in protected mode test
    
    * Add in overriden method
    
    * Commit github comment update
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update the github comments on hashing the pk
    
    * Fix the checkstyle exceptions
    
    * Change the HashedPrimaryKey to pk
    
    * Add in clear the map even when inconsistencies exist during PROTECTED mode
    
    * Checkstyle fixes and test failures
    
    * Change the test to test the reload in PROTECTED mode: 
PartialUpsertTableRebalanceIntegrationTest
    
    * spotless fixes
    
    ---------
    
    Co-authored-by: Xiaotian (Jackie) Jiang 
<[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 .../pinot/controller/BaseControllerStarter.java    |   7 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  25 ++-
 .../core/data/manager/BaseTableDataManager.java    |  31 +--
 .../realtime/RealtimeSegmentDataManager.java       |   6 +-
 .../realtime/UpsertInconsistentStateConfig.java    | 102 ---------
 .../tests/CommitTimeCompactionIntegrationTest.java |  54 +++--
 ...PartialUpsertTableRebalanceIntegrationTest.java |  40 ++--
 .../models/DummyTableUpsertMetadataManager.java    |  20 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 162 ++++++---------
 ...oncurrentMapPartitionUpsertMetadataManager.java | 187 ++++++++++-------
 ...nUpsertMetadataManagerForConsistentDeletes.java | 228 ++++++++++++++-------
 .../pinot/segment/local/upsert/UpsertContext.java  |  10 +
 .../pinot/segment/local/upsert/UpsertUtils.java    |  42 +++-
 .../segment/local/utils/TableConfigUtils.java      |  11 +-
 .../BasePartitionUpsertMetadataManagerTest.java    |  37 ++--
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  76 +++++--
 .../server/starter/helix/BaseServerStarter.java    |   7 +-
 .../starter/helix/HelixInstanceDataManager.java    |  35 ++--
 .../apache/pinot/spi/utils/CommonConstants.java    |  17 +-
 .../ConsumingSegmentConsistencyModeListener.java   | 134 ++++++++++++
 20 files changed, 757 insertions(+), 474 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 31ed430eeec..14ae16a0bbd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -135,7 +135,6 @@ import 
org.apache.pinot.controller.validation.ResourceUtilizationChecker;
 import org.apache.pinot.controller.validation.ResourceUtilizationManager;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
 import org.apache.pinot.controller.validation.UtilizationChecker;
-import 
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
 import org.apache.pinot.core.instance.context.ControllerContext;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -159,6 +158,7 @@ import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.PinotMd5Mode;
@@ -768,8 +768,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
 
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
-    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(UpsertInconsistentStateConfig.getInstance());
-    LOGGER.info("Registered UpsertInconsistentStateConfig as cluster config 
change listener");
+    _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+        ConsumingSegmentConsistencyModeListener.getInstance());
+    LOGGER.info("Registered ConsumingSegmentConsistencyModeListener as cluster 
config change listener");
   }
 
   protected PinotLLCRealtimeSegmentManager 
createPinotLLCRealtimeSegmentManager() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 113cdac8e5c..7913e88c07c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -105,8 +105,8 @@ import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionSt
 import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
-import 
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -134,6 +134,7 @@ import 
org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -2788,21 +2789,25 @@ public class PinotLLCRealtimeSegmentManager {
    * Validates that force commit is allowed for the given table.
    * Throws IllegalStateException if force commit is disabled for 
partial-upsert tables
    * or upsert tables with dropOutOfOrder enabled when replication > 1.
+   * Force commit is always allowed for tables without inconsistent state 
configs.
    */
   private void validateForceCommitAllowed(String tableNameWithType) {
     TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
       throw new IllegalStateException("Table config not found for table: " + 
tableNameWithType);
     }
-    UpsertInconsistentStateConfig configInstance = 
UpsertInconsistentStateConfig.getInstance();
-    if (!configInstance.isForceCommitReloadAllowed(tableConfig)) {
-      throw new IllegalStateException(
-          "Force commit disabled for table: " + tableNameWithType
-              + ". Table is configured as partial upsert or 
dropOutOfOrderRecord=true with replication > 1, "
-              + "which can cause data inconsistency during force commit. "
-              + "Current cluster config '" + configInstance.getConfigKey() + 
"' is set to: "
-              + configInstance.isForceCommitReloadEnabled()
-              + ". To enable force commit, set this config to 'true'.");
+    // Only restrict force commit for tables with inconsistent state configs
+    // (partial upsert or dropOutOfOrder tables with replication > 1)
+    boolean isInconsistentMetadataDuringConsumption =
+        TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+    ConsumingSegmentConsistencyModeListener configInstance = 
ConsumingSegmentConsistencyModeListener.getInstance();
+    if (!configInstance.isForceCommitAllowed() && 
isInconsistentMetadataDuringConsumption) {
+      throw new IllegalStateException("Force commit disabled for table: " + 
tableNameWithType
+          + ". Table is configured as partial upsert or 
dropOutOfOrderRecord=true with replication > 1, "
+          + "which can cause data inconsistency during force commit. " + 
"Current cluster config '"
+          + configInstance.getConfigKey() + "' is set to: " + 
configInstance.getConsistencyMode()
+          + ". To enable safer force commit, set cluster config '" + 
configInstance.getConfigKey()
+          + "' to 'PROTECTED'.");
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 07dd67d376a..815eb5bdb55 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -67,7 +67,6 @@ import org.apache.pinot.common.utils.config.TierConfigUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import 
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.StaleSegment;
@@ -87,6 +86,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -115,6 +115,7 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -957,21 +958,25 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     if (segmentDataManager instanceof RealtimeSegmentDataManager) {
       // Use force commit to reload consuming segment
       if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
-        // Force-committing consuming segments is enabled by default.
-        // For partial-upsert tables or upserts with out-of-order events 
enabled (notably when replication > 1),
-        // winner selection could incorrectly favor replicas with fewer 
consumed rows.
-        // This triggered unnecessary reconsumption and resulted in 
inconsistent upsert state.
-        // The new fix restores and correct segment metadata after 
inconsistencies are noticed.
-        // To toggle, existing Force commit behavior dynamically use the 
cluster config
-        // `pinot.server.upsert.force.commit.reload` without restarting 
servers.
+        // Force-committing consuming segments is restricted only for tables 
with inconsistent state configs
+        // (partial-upsert or dropOutOfOrderRecord=true with replication > 1).
+        // For these tables, winner selection could incorrectly favor replicas 
with fewer consumed rows,
+        // triggering unnecessary reconsumption and resulting in inconsistent 
upsert state.
+        // To enable force commit for such tables, change the cluster config
+        // `pinot.server.consuming.segment.consistency.mode` to PROTECTED for 
safer reload.
         TableConfig tableConfig = indexLoadingConfig.getTableConfig();
-        UpsertInconsistentStateConfig config = 
UpsertInconsistentStateConfig.getInstance();
-        if (tableConfig != null && 
config.isForceCommitReloadAllowed(tableConfig)) {
+        ConsumingSegmentConsistencyModeListener config = 
ConsumingSegmentConsistencyModeListener.getInstance();
+        boolean isTableTypeInconsistentDuringConsumption =
+            
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+        // Allow force commit if:
+        // 1. Table doesn't have inconsistent configs (non-upsert or standard 
upsert tables), OR
+        // 2. Consistency mode is PROTECTED or UNSAFE (isForceCommitAllowed = 
true)
+        if (tableConfig == null || (isTableTypeInconsistentDuringConsumption 
&& !config.isForceCommitAllowed())) {
+          _logger.warn("Skipping reload (force commit) on consuming segment: 
{} due to inconsistent state config. "
+              + "Change the cluster config: {} to `PROTECTED` for safer 
commit", segmentName, config.getConfigKey());
+        } else {
           _logger.info("Reloading (force committing) consuming segment: {}", 
segmentName);
           ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
-        } else {
-          _logger.warn("Skipping reload (force commit) on consuming segment: 
{} due to inconsistent state config. "
-              + "Control via cluster config: {}", segmentName, 
config.getConfigKey());
         }
       } else {
         _logger.warn("Skip reloading consuming segment: {} as configured", 
segmentName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d555c13403b..37c9511d2bb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -83,7 +83,6 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import 
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy;
 import org.apache.pinot.spi.data.Schema;
@@ -1882,9 +1881,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     //     TODO: Revisit the non-pauseless handling
     if (_partitionUpsertMetadataManager != null) {
       UpsertContext upsertContext = 
_partitionUpsertMetadataManager.getContext();
-      if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit() || (
-          upsertContext.getUpsertMode() != UpsertConfig.Mode.PARTIAL && 
!upsertContext.isDropOutOfOrderRecord()
-              && upsertContext.getOutOfOrderRecordColumn() == null)) {
+      if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit()
+          || !upsertContext.isTableTypeInconsistentDuringConsumption()) {
         return ParallelSegmentConsumptionPolicy.ALLOW_ALWAYS;
       }
       return pauseless ? 
ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
deleted file mode 100644
index 18953f42e32..00000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.config.table.TableConfig;
-import 
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Singleton class to manage the configuration for force commit and reload on 
consuming segments
- * for upsert tables with inconsistent state configurations (partial upsert or 
dropOutOfOrderRecord=true
- * with consistency mode NONE and replication > 1).
- *
- * This configuration is dynamically updatable via ZK cluster config without 
requiring a server restart.
- */
-public class UpsertInconsistentStateConfig implements 
PinotClusterConfigChangeListener {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
-  private static final UpsertInconsistentStateConfig INSTANCE = new 
UpsertInconsistentStateConfig();
-
-  private final AtomicBoolean _forceCommitReloadEnabled =
-      new 
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
-
-  private UpsertInconsistentStateConfig() {
-  }
-
-  public static UpsertInconsistentStateConfig getInstance() {
-    return INSTANCE;
-  }
-
-  /**
-   * Checks if force commit/reload is allowed for the given table config.
-   *
-   * @param tableConfig the table config to check, may be null
-   * @return true if force commit/reload is allowed (either globally enabled 
or table has no inconsistent configs)
-   */
-  public boolean isForceCommitReloadAllowed(@Nullable TableConfig tableConfig) 
{
-    if (tableConfig == null) {
-      return false;
-    }
-    if (_forceCommitReloadEnabled.get()) {
-      return true;
-    }
-    // Allow if table doesn't have inconsistent state configs
-    return !TableConfigUtils.checkForInconsistentStateConfigs(tableConfig);
-  }
-
-  /**
-   * Returns whether force commit/reload is currently enabled globally.
-   */
-  public boolean isForceCommitReloadEnabled() {
-    return _forceCommitReloadEnabled.get();
-  }
-
-  /**
-   * Returns the current config key used for this setting.
-   */
-  public String getConfigKey() {
-    return ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG;
-  }
-
-  @Override
-  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
-    if 
(!changedConfigs.contains(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG))
 {
-      return;
-    }
-
-    String configValue = 
clusterConfigs.get(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG);
-    boolean forceCommitReloadAllowed = (configValue == null)
-        ? ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD
-        : Boolean.parseBoolean(configValue);
-
-    boolean previousValue = 
_forceCommitReloadEnabled.getAndSet(forceCommitReloadAllowed);
-    if (previousValue != forceCommitReloadAllowed) {
-      LOGGER.info("Updated cluster config: {} from {} to {}",
-          ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG, 
previousValue, forceCommitReloadAllowed);
-    }
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
index a8f219a3819..20ac0f8381d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
@@ -33,6 +33,8 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -533,22 +535,29 @@ public class CommitTimeCompactionIntegrationTest extends 
BaseClusterIntegrationT
     validatePreCommitState(tableNameWithoutCompaction, tableNameWithCompaction,
         tableNameWithCompactionColumnMajor, 3);
 
-    // Perform commit and wait for completion
-    performCommitAndWait(tableNameWithoutCompaction, tableNameWithCompaction,
-        tableNameWithCompactionColumnMajor, 20_000L, 4, 2);
+    try {
+      
setConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED);
 
-    // Brief wait to ensure all commit operations are complete
-    waitForAllDocsLoaded(tableNameWithCompaction, 60_000L, 3);
-    waitForAllDocsLoaded(tableNameWithCompactionColumnMajor, 60_000L, 3);
+      // Perform commit and wait for completion
+      performCommitAndWait(tableNameWithoutCompaction, tableNameWithCompaction,
+          tableNameWithCompactionColumnMajor, 20_000L, 4, 2);
 
-    // Validate post-commit compaction effectiveness and data integrity 
(expecting 3 records, min 2 removed)
-    validatePostCommitCompaction(tableNameWithoutCompaction, 
tableNameWithCompaction,
-        tableNameWithCompactionColumnMajor, 3, 2, 0.95);
+      // Brief wait to ensure all commit operations are complete
+      waitForAllDocsLoaded(tableNameWithCompaction, 60_000L, 3);
+      waitForAllDocsLoaded(tableNameWithCompactionColumnMajor, 60_000L, 3);
 
-    // Clean up
-    cleanupTablesAndSchemas(
-        List.of(tableNameWithoutCompaction, tableNameWithCompaction, 
tableNameWithCompactionColumnMajor),
-        List.of(tableNameWithoutCompaction, tableNameWithCompaction, 
tableNameWithCompactionColumnMajor));
+      // Validate post-commit compaction effectiveness and data integrity 
(expecting 3 records, min 2 removed)
+      validatePostCommitCompaction(tableNameWithoutCompaction, 
tableNameWithCompaction,
+          tableNameWithCompactionColumnMajor, 3, 2, 0.95);
+    } finally {
+      try {
+        resetConsumingSegmentConsistencyMode();
+      } finally {
+        cleanupTablesAndSchemas(
+            List.of(tableNameWithoutCompaction, tableNameWithCompaction, 
tableNameWithCompactionColumnMajor),
+            List.of(tableNameWithoutCompaction, tableNameWithCompaction, 
tableNameWithCompactionColumnMajor));
+      }
+    }
   }
 
   @Test
@@ -1222,6 +1231,25 @@ public class CommitTimeCompactionIntegrationTest extends 
BaseClusterIntegrationT
     });
   }
 
+  private void 
setConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode 
mode)
+      throws Exception {
+    
updateClusterConfig(Map.of(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE,
 mode.name()));
+    waitForConsumingSegmentConsistencyMode(mode);
+  }
+
+  private void resetConsumingSegmentConsistencyMode()
+      throws Exception {
+    
deleteClusterConfig(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+    waitForConsumingSegmentConsistencyMode(
+        
ConsumingSegmentConsistencyModeListener.Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+  }
+
+  private void 
waitForConsumingSegmentConsistencyMode(ConsumingSegmentConsistencyModeListener.Mode
 expectedMode) {
+    TestUtils.waitForCondition(
+        aVoid -> 
ConsumingSegmentConsistencyModeListener.getInstance().getConsistencyMode() == 
expectedMode, 10_000L,
+        "Timed out waiting for consuming segment consistency mode to become: " 
+ expectedMode);
+  }
+
   @Test
   public void testCommitTimeCompactionPreservesDeletedRecords()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index e3bbde37258..ae2fc011285 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -49,6 +49,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -232,20 +233,31 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
     pushAvroIntoKafka(_avroFiles);
     waitForAllDocsLoaded(600_000L, 300);
 
-    String statusResponse = reloadRealtimeTable(getTableName());
-    Map<String, String> statusResponseJson =
-        JsonUtils.stringToObject(statusResponse, new TypeReference<Map<String, 
String>>() {
-        });
-    String reloadResponse = statusResponseJson.get("status");
-    int jsonStartIndex = reloadResponse.indexOf("{");
-    String trimmedResponse = reloadResponse.substring(jsonStartIndex);
-    Map<String, Map<String, String>> reloadStatus =
-        JsonUtils.stringToObject(trimmedResponse, new 
TypeReference<Map<String, Map<String, String>>>() {
-        });
-    String reloadJobId = 
reloadStatus.get(REALTIME_TABLE_NAME).get("reloadJobId");
-    waitForReloadToComplete(reloadJobId, 600_000L);
-    waitForAllDocsLoaded(600_000L, 300);
-    verifyIdealState(4, NUM_SERVERS); // 4 because reload triggers commit of 
consuming segments
+    // Partial-upsert tables need PROTECTED consuming segment consistency mode 
for reload to force-commit
+    // consuming segments; RESTRICTED (default) skips force-commit and reload 
never completes.
+    try {
+      updateClusterConfig(
+          
Map.of(CommonConstants.ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE,
+              ConsumingSegmentConsistencyModeListener.Mode.PROTECTED.name()));
+      Thread.sleep(5000); // Allow server to pick up cluster config from ZK
+
+      String statusResponse = reloadRealtimeTable(getTableName());
+      Map<String, String> statusResponseJson =
+          JsonUtils.stringToObject(statusResponse, new 
TypeReference<Map<String, String>>() {
+          });
+      String reloadResponse = statusResponseJson.get("status");
+      int jsonStartIndex = reloadResponse.indexOf("{");
+      String trimmedResponse = reloadResponse.substring(jsonStartIndex);
+      Map<String, Map<String, String>> reloadStatus =
+          JsonUtils.stringToObject(trimmedResponse, new 
TypeReference<Map<String, Map<String, String>>>() {
+          });
+      String reloadJobId = 
reloadStatus.get(REALTIME_TABLE_NAME).get("reloadJobId");
+      waitForReloadToComplete(reloadJobId, 600_000L);
+      waitForAllDocsLoaded(600_000L, 300);
+      verifyIdealState(4, NUM_SERVERS); // 4 because reload triggers commit of 
consuming segments
+    } finally {
+      
deleteClusterConfig(CommonConstants.ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+    }
   }
 
   @AfterMethod
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index b7c9b733ef4..6482612be90 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -78,10 +79,6 @@ public class DummyTableUpsertMetadataManager extends 
BaseTableUpsertMetadataMana
       return false;
     }
 
-    @Override
-    protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
-    }
-
     @Override
     protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
     }
@@ -91,17 +88,26 @@ public class DummyTableUpsertMetadataManager extends 
BaseTableUpsertMetadataMana
       return null;
     }
 
+    @Override
+    protected void revertAndRemoveSegment(IndexSegment segment,
+        Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+    }
+
+    @Override
+    protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    }
+
     @Override
     protected void doRemoveExpiredPrimaryKeys() {
     }
 
     @Override
-    protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
-        ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    protected int getPrevKeyToRecordLocationSize() {
+      return 0;
     }
 
     @Override
-    protected void eraseKeyToPreviousLocationMap() {
+    protected void clearPrevKeyToRecordLocation() {
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index aa43d89f819..1c9975da3f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -49,7 +49,6 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
-import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
 import org.apache.pinot.segment.local.utils.WatermarkUtils;
@@ -67,6 +66,7 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.BooleanUtils;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -293,7 +293,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doAddSegment((ImmutableSegmentImpl) segment);
-      eraseKeyToPreviousLocationMap();
       _trackedSegments.add(segment);
       if (_enableSnapshot) {
         _updatedSegmentsSinceLastSnapshot.add(segment);
@@ -420,7 +419,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doPreloadSegment((ImmutableSegmentImpl) segment);
-      eraseKeyToPreviousLocationMap();
       _trackedSegments.add(segment);
       _updatedSegmentsSinceLastSnapshot.add(segment);
     } finally {
@@ -587,7 +585,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doReplaceSegment(segment, oldSegment);
-      eraseKeyToPreviousLocationMap();
       if (!(segment instanceof EmptyIndexSegment)) {
         _trackedSegments.add(segment);
         if (_enableSnapshot) {
@@ -637,11 +634,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
-  private static final int MAX_UPSERT_REVERT_RETRIES = 3;
-
   /**
-   * NOTE: We allow passing in validDocIds and queryableDocIds here so that 
the value can be easily accessed from the
-   *       tests. The passed in bitmaps should always be empty.
+   * NOTE: We allow passing in validDocIds and queryableDocIds here so that 
the value can be easily accessed from tests.
+   *       The passed-in bitmaps should always be empty.
    */
   @VisibleForTesting
   public void replaceSegment(ImmutableSegment segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
@@ -678,108 +673,74 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-      checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment, segmentName);
+      if (_context.isTableTypeInconsistentDuringConsumption()) {
+        if (shouldRevertMetadataOnInconsistency(oldSegment)) {
+          // If there are still valid docs in the old segment, validate and 
revert the metadata of the
+          // consuming segment in place
+          revertSegmentUpsertMetadata(oldSegment, segmentName, 
validDocIdsForOldSegment);
+          return;
+        } else {
+          logInconsistentResults(segmentName, 
validDocIdsForOldSegment.getCardinality());
+        }
+      }
       removeSegment(oldSegment, validDocIdsForOldSegment);
     }
   }
 
-  void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
-      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
-      MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
-    int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-    boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
-    // For partial-upsert tables and upsert tables with 
dropOutOfOrderRecord=true or outOfOrderRecordColumn configured,
-    // we do not store previous record locations and instead remove all 
primary keys that are not replaced. This can
-    // lead to inconsistencies across replicas when a consuming segment is 
replaced by a committed segment generated
-    // on a different server with a different set of records. Such scenarios 
can occur when stream consumers do not
-    // guarantee the same consumption order, or when a segment with fewer 
consumed rows replaces another segment.
-    // To prevent these inconsistencies, we persist the previous record 
locations so that we can revert to them and
-    // restore consistency across replicas.
-    if (isConsumingSegmentSeal && (_context.isDropOutOfOrderRecord() || 
_context.getOutOfOrderRecordColumn() != null)) {
-      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for upsert table with "
-              + "dropOutOfOrderRecord or outOfOrderRecordColumn enabled. This 
can potentially cause inconsistency "
-              + "between replicas. Reverting back metadata changes and 
triggering segment replacement.",
-          numKeysNotReplaced,
-          segmentName);
-      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
-    } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
-      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for partial-upsert table. "
-          + "This can potentially cause inconsistency between replicas. "
-          + "Reverting metadata changes and triggering segment replacement.", 
numKeysNotReplaced, segmentName);
-      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
-    } else {
-      _logger.warn("Found {} primary keys not replaced for the segment: {}.", 
numKeysNotReplaced, segmentName);
-    }
+  /**
+   * Determines whether metadata should be reverted when inconsistencies are 
detected during segment replacement.
+   * This is only applicable when in PROTECTED mode, the old segment is a 
mutable segment,
+   * and the table has inconsistent state configurations.
+   *
+   * @param oldSegment the old segment being replaced
+   * @return true if metadata revert should be performed on inconsistency
+   */
+  public boolean shouldRevertMetadataOnInconsistency(IndexSegment oldSegment) {
+    return 
ConsumingSegmentConsistencyModeListener.getInstance().getConsistencyMode()
+        .equals(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED) && 
oldSegment instanceof MutableSegment
+        && _context.isTableTypeInconsistentDuringConsumption();
   }
 
   /**
-   * Reverts segment upsert metadata and retries addOrReplaceSegment with a 
maximum retry limit to prevent infinite
-   * recursion in case of persistent inconsistencies.
+   * Reverts segment upsert metadata
    */
-  void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
-      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName) {
-    for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES; 
retryCount++) {
-      revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds, 
queryableDocIds);
-      MutableRoaringBitmap validDocIdsForOldSegment = 
getValidDocIdsForOldSegment(oldSegment);
-      try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-          _comparisonColumns, _deleteRecordColumn)) {
-        Iterator<RecordInfo> latestRecordInfoIterator =
-            UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
-        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, latestRecordInfoIterator,
-            oldSegment, validDocIdsForOldSegment);
-      } catch (Exception e) {
-        throw new RuntimeException(
-            String.format("Caught exception while replacing segment metadata 
during inconsistencies: %s, table: %s",
-                segmentName, _tableNameWithType), e);
-      }
-
-      validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
-      if (validDocIdsForOldSegment.isEmpty()) {
-        _logger.info("Successfully resolved inconsistency for segment: {} 
after {} retry attempt(s)", segmentName,
-            retryCount + 1);
-        return;
-      }
-
-      int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
-      if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
-        _logger.warn("Retry {}/{}: Still found {} primary keys not replaced 
for segment: {}. Retrying...",
-            retryCount + 1, MAX_UPSERT_REVERT_RETRIES, 
numKeysStillNotReplaced, segmentName);
-      } else {
-        _logger.error("Exhausted all {} retries for segment: {}. Found {} 
primary keys still not replaced. "
-                + "Proceeding with current state which may cause 
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
-            segmentName,
-            numKeysStillNotReplaced);
-        if (_context.isDropOutOfOrderRecord() || 
_context.getOutOfOrderRecordColumn() != null) {
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
-              numKeysStillNotReplaced);
-        } else if (_partialUpsertHandler != null) {
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-              numKeysStillNotReplaced);
-        }
-      }
+  protected void revertSegmentUpsertMetadata(IndexSegment oldSegment, String 
segmentName,
+      MutableRoaringBitmap validDocIdsForOldSegment) {
+    _logger.info("Inconsistencies noticed for the segment: {} across servers, 
reverting the metadata to resolve...",
+        segmentName);
+    // Revert the keys in the segment to previous location and remove the 
newly added keys
+    removeSegment(oldSegment, validDocIdsForOldSegment);
+    if (getPrevKeyToRecordLocationSize() == 0) {
+      _logger.info("Successfully resolved inconsistency for segment: {} across 
servers", segmentName);
+      return;
+    }
+    int numKeysStillNotReplaced = getPrevKeyToRecordLocationSize();
+    if (numKeysStillNotReplaced > 0) {
+      logInconsistentResults(segmentName, numKeysStillNotReplaced);
+      // Clear the map when inconsistencies still exist for the consuming 
segments
+      clearPrevKeyToRecordLocation();
     }
   }
 
-  protected abstract void removeNewlyAddedKeys(IndexSegment oldSegment);
-
-  protected abstract void eraseKeyToPreviousLocationMap();
-
-  protected abstract void revertCurrentSegmentUpsertMetadata(IndexSegment 
oldSegment,
-      ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds);
+  protected void logInconsistentResults(String segmentName, int 
numKeysStillNotReplaced) {
+    _logger.error("Found {} primary keys not replaced for segment: {}. "
+            + "Proceeding with current state which may cause inconsistency. To 
correct this behaviour from now, set "
+            + "cluster config: 
`pinot.server.consuming.segment.consistency.mode` to `PROTECTED`",
+        numKeysStillNotReplaced, segmentName);
+    if (_context.isDropOutOfOrderRecord() || 
_context.getOutOfOrderRecordColumn() != null) {
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+          numKeysStillNotReplaced);
+    } else if (_partialUpsertHandler != null) {
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+          numKeysStillNotReplaced);
+    }
+  }
 
   private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment 
oldSegment) {
     return oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
   }
 
-  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
-    try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
-      removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
-    } catch (Exception e) {
-      throw new RuntimeException(
-          String.format("Caught exception while removing segment: %s, table: 
%s, message: %s", segment.getSegmentName(),
-              _tableNameWithType, e.getMessage()), e);
-    }
-  }
+  protected abstract void removeSegment(IndexSegment segment, 
MutableRoaringBitmap validDocIds);
 
   protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
     throw new UnsupportedOperationException("Both removeSegment(segment, 
validDocID) and "
@@ -825,7 +786,11 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
 
     _logger.info("Removing {} primary keys for segment: {}", 
validDocIds.getCardinality(), segmentName);
-    removeSegment(segment, validDocIds);
+    if (shouldRevertMetadataOnInconsistency(segment)) {
+      revertSegmentUpsertMetadata(segment, segmentName, validDocIds);
+    } else {
+      removeSegment(segment, validDocIds);
+    }
 
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
@@ -1142,6 +1107,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  protected abstract void revertAndRemoveSegment(IndexSegment segment,
+      Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator);
+
   /**
    * Removes all primary keys that have comparison value smaller than 
(largestSeenComparisonValue - TTL).
    */
@@ -1327,4 +1295,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     // Fall back to local creation time if ZK creation time is not set
     return segmentMetadata.getIndexCreationTime();
   }
+
+  protected abstract int getPrevKeyToRecordLocationSize();
+
+  protected abstract void clearPrevKeyToRecordLocation();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 6edc1020150..a6c0721e5fe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMeter;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -55,8 +56,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
-  private final ConcurrentHashMap<Object, RecordLocation> 
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
-  private final Map<Object, RecordLocation> _newlyAddedKeys = new 
ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<Object, RecordLocation> 
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId, UpsertContext context) {
     super(tableNameWithType, partitionId, context);
@@ -93,12 +94,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
                   replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                  RecordLocation newRecordLocation = new 
RecordLocation(segment, newDocId, newComparisonValue);
-                  // Track the record location of the newly added keys
-                  if (_newlyAddedKeys.containsKey(primaryKey)) {
-                    _newlyAddedKeys.put(primaryKey, newRecordLocation);
-                  }
-                  return newRecordLocation;
+                  return new RecordLocation(segment, newDocId, 
newComparisonValue);
                 } else {
                   return currentRecordLocation;
                 }
@@ -121,8 +117,19 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
                       validDocIdsForOldSegment.remove(currentDocId);
                     }
                   }
+                  if (_context.isTableTypeInconsistentDuringConsumption()
+                      && currentSegment instanceof MutableSegment) {
+                    _previousKeyToRecordLocationMap.remove(primaryKey);
+                  }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue);
                 } else {
+                  RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                  if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment
+                      && (prevRecordLocation == null
+                      || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                    RecordLocation newRecordLocation = new 
RecordLocation(segment, newDocId, newComparisonValue);
+                    _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                  }
                   return currentRecordLocation;
                 }
               }
@@ -134,8 +141,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
                 numKeysInWrongSegment.getAndIncrement();
                 if (comparisonResult >= 0) {
                   addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                  if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment) {
+                    _previousKeyToRecordLocationMap.remove(primaryKey);
+                  }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue);
                 } else {
+                  RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                  if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment
+                      && (prevRecordLocation == null
+                      || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                    RecordLocation newRecordLocation = new 
RecordLocation(segment, newDocId, newComparisonValue);
+                    _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                  }
                   return currentRecordLocation;
                 }
               }
@@ -148,19 +165,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
                   currentSegmentName, 
getAuthoritativeUpdateOrCreationTime(segment),
                   getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
-                if (currentSegment != segment) {
-                  _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment) {
+                  _previousKeyToRecordLocationMap.remove(primaryKey);
                 }
                 return new RecordLocation(segment, newDocId, 
newComparisonValue);
               } else {
+                RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment
+                    && (prevRecordLocation == null
+                    || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                  RecordLocation newRecordLocation = new 
RecordLocation(segment, newDocId, newComparisonValue);
+                  _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                }
                 return currentRecordLocation;
               }
             } else {
               // New primary key
               addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-              RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
-              _newlyAddedKeys.put(primaryKey, newRecordLocation);
-              return newRecordLocation;
+              return new RecordLocation(segment, newDocId, newComparisonValue);
             }
           });
     }
@@ -192,6 +214,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
       
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
           (pk, recordLocation) -> {
             if (recordLocation.getSegment() == segment) {
+              if (_context.isTableTypeInconsistentDuringConsumption() && 
segment instanceof MutableSegment) {
+                _previousKeyToRecordLocationMap.remove(pk);
+              }
               return null;
             }
             return recordLocation;
@@ -199,6 +224,72 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     }
   }
 
+  @Override
+  protected void revertAndRemoveSegment(IndexSegment segment,
+      Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+    while (primaryKeyIterator.hasNext()) {
+      Map.Entry<Integer, PrimaryKey> primaryKeyEntry = 
primaryKeyIterator.next();
+      PrimaryKey primaryKey = primaryKeyEntry.getValue();
+      int docId = primaryKeyEntry.getKey();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+            RecordLocation prevLocation = 
_previousKeyToRecordLocationMap.get(pk);
+            if (prevLocation == null) {
+              _previousKeyToRecordLocationMap.remove(pk);
+              return null;
+            }
+            if (recordLocation.getSegment() == segment) {
+              // Revert to previous segment location
+              IndexSegment prevSegment = prevLocation.getSegment();
+              ThreadSafeMutableRoaringBitmap prevValidDocIds = 
prevSegment.getValidDocIds();
+              if (prevValidDocIds != null) {
+                try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
+                    _primaryKeyColumns, _comparisonColumns, 
_deleteRecordColumn)) {
+                  int prevDocId = prevLocation.getDocId();
+                  RecordInfo recordInfo = 
recordInfoReader.getRecordInfo(prevDocId);
+                  replaceDocId(prevSegment, prevValidDocIds, 
prevSegment.getQueryableDocIds(), segment, docId,
+                      prevDocId, recordInfo);
+                  _previousKeyToRecordLocationMap.remove(pk);
+                  return prevLocation;
+                } catch (IOException e) {
+                  _logger.warn("Failed to revert to previous segment: {}, 
removing key", prevSegment.getSegmentName(),
+                      e);
+                  _previousKeyToRecordLocationMap.remove(pk);
+                  return null;
+                }
+              } else {
+                _previousKeyToRecordLocationMap.remove(pk);
+                return null;
+              }
+            } else if (recordLocation.getSegment() instanceof 
ImmutableSegmentImpl) {
+              _previousKeyToRecordLocationMap.remove(pk);
+            } else {
+              _logger.warn(
+                  "Consuming segment: {} has already ingested the primary key 
for docId {} from segment: {}, suggesting"
+                      + " that consumption is occurring concurrently with 
segment replacement, which is undesirable "
+                      + "for consistency.", 
recordLocation.getSegment().getSegmentName(), primaryKeyEntry.getKey(),
+                  segment.getSegmentName());
+            }
+            return recordLocation;
+          });
+    }
+  }
+
+  @Override
+  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
+    try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
+      if (shouldRevertMetadataOnInconsistency(segment)) {
+        revertAndRemoveSegment(segment, 
UpsertUtils.getRecordIterator(primaryKeyReader, validDocIds));
+      } else {
+        removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while removing segment: %s, table: 
%s, message: %s", segment.getSegmentName(),
+              _tableNameWithType, e.getMessage()), e);
+    }
+  }
+
   @Override
   public void doRemoveExpiredPrimaryKeys() {
     AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
@@ -260,57 +351,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   }
 
   @Override
-  protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
-      ThreadSafeMutableRoaringBitmap queryableDocIds) {
-    // Revert to previous locations present in other segment
-    // Replace the valid doc id to that segment location
-    _logger.info("Reverting Upsert metadata for {} keys for the segment: {}", 
_previousKeyToRecordLocationMap.size(),
-        oldSegment.getSegmentName());
-    for (Map.Entry<Object, RecordLocation> obj : 
_previousKeyToRecordLocationMap.entrySet()) {
-      IndexSegment prevSegment = obj.getValue().getSegment();
-      RecordLocation currentLocation = 
_primaryKeyToRecordLocationMap.get(obj.getKey());
-      if (prevSegment != null) {
-        if (currentLocation != null && currentLocation.getSegment() == 
oldSegment) {
-          try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
-              _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
-            int newDocId = obj.getValue().getDocId();
-            int currentDocId = 
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
-            RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
-            replaceDocId(prevSegment, prevSegment.getValidDocIds(), 
prevSegment.getQueryableDocIds(), oldSegment,
-                currentDocId, newDocId, recordInfo);
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-          _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
-        }
-      } else {
-        _primaryKeyToRecordLocationMap.remove(obj.getKey());
-      }
-    }
-    _logger.info("Reverted Upsert metadata of {} keys to previous segment 
locations for the segment: {}",
-        _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
-    removeNewlyAddedKeys(oldSegment);
-  }
-
-  @Override
-  protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
-    // Remove the newly added keys in the metadata map and in the valid doc ids
-    int removedKeys = 0;
-    for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet()) 
{
-      if (entry.getValue().getSegment() == oldSegment) {
-        _primaryKeyToRecordLocationMap.remove(entry.getKey());
-        removeDocId(oldSegment, entry.getValue().getDocId());
-        removedKeys++;
-      }
-    }
-    _logger.info("Removed newly added {} keys for the segment: {} out of : 
{}", removedKeys,
-        oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+  protected int getPrevKeyToRecordLocationSize() {
+    return _previousKeyToRecordLocationMap.size();
   }
 
   @Override
-  protected void eraseKeyToPreviousLocationMap() {
+  protected void clearPrevKeyToRecordLocation() {
     _previousKeyToRecordLocationMap.clear();
-    _newlyAddedKeys.clear();
   }
 
   @Override
@@ -326,7 +373,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
       double comparisonValue = ((Number) newComparisonValue).doubleValue();
       _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
comparisonValue));
     }
-
     
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
         (primaryKey, currentRecordLocation) -> {
           if (currentRecordLocation != null) {
@@ -339,19 +385,18 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
               if (segment == currentSegment) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                // Track the record location of the newly added keys
-                if (_newlyAddedKeys.containsKey(primaryKey)) {
-                  _newlyAddedKeys.put(primaryKey, newRecordLocation);
-                }
               } else {
-                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                // For consistent ParallelSegmentConsumptionPolicy like 
DISALLOW_ALWAYS and ALLOW_DURING_BUILD_ONLY,
+                // we shouldn't be seeing two mutable segments, so ideally 
current segment shouldn't point to
+                // Mutable segment, unless other modes are enabled which could 
lead to inconsistent behaviour
+                if (_context.isTableTypeInconsistentDuringConsumption()
+                    && !(currentSegment instanceof MutableSegment)) {
+                  _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                }
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
               }
               return newRecordLocation;
             } else {
-              if (segment != currentSegment) {
-                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
-              }
               // Out-of-order record
               
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
               isOutOfOrderRecord.set(true);
@@ -360,9 +405,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
           } else {
             // New primary key
             addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-            RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
-            _newlyAddedKeys.put(primaryKey, newRecordLocation);
-            return newRecordLocation;
+            return new RecordLocation(segment, newDocId, newComparisonValue);
           }
         });
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 6e42097ee9d..7535da9d3e7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -69,13 +69,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
       _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
-  private final ConcurrentHashMap<Object,
-      
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+  @VisibleForTesting
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
       _previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
-  private final Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
-      _newlyAddedKeys = new ConcurrentHashMap<>();
-
   // Used to initialize a reference to previous row for merging in partial 
upsert
   private final LazyRow _reusePreviousRow = new LazyRow();
   private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@@ -124,13 +121,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
                   replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                  RecordLocation newRecordLocation = new 
RecordLocation(segment,
-                      newDocId, newComparisonValue, 
currentDistinctSegmentCount);
-                  // Track the record location of the newly added keys
-                  if (_newlyAddedKeys.containsKey(primaryKey)) {
-                    _newlyAddedKeys.put(primaryKey, newRecordLocation);
-                  }
-                  return newRecordLocation;
+                  return new RecordLocation(segment, newDocId, 
newComparisonValue, currentDistinctSegmentCount);
                 } else {
                   return currentRecordLocation;
                 }
@@ -153,9 +144,22 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
                       validDocIdsForOldSegment.remove(currentDocId);
                     }
                   }
+                  if (_context.isTableTypeInconsistentDuringConsumption()
+                      && currentSegment instanceof MutableSegment) {
+                    _previousKeyToRecordLocationMap.remove(primaryKey);
+                  }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue,
                       
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
                 } else {
+                  if (_context.isTableTypeInconsistentDuringConsumption()) {
+                    RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                    if (currentSegment instanceof MutableSegment && 
(prevRecordLocation == null
+                        || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                      RecordLocation newRecordLocation =
+                          new RecordLocation(segment, newDocId, 
newComparisonValue, currentDistinctSegmentCount);
+                      _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                    }
+                  }
                   return new RecordLocation(currentSegment, currentDocId, 
currentComparisonValue,
                       
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
                 }
@@ -168,9 +172,21 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
                 numKeysInWrongSegment.getAndIncrement();
                 if (comparisonResult >= 0) {
                   addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
+                  if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment) {
+                    _previousKeyToRecordLocationMap.remove(primaryKey);
+                  }
                   return new RecordLocation(segment, newDocId, 
newComparisonValue,
                       
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
                 } else {
+                  if (_context.isTableTypeInconsistentDuringConsumption()) {
+                    RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                    if (currentSegment instanceof MutableSegment && 
(prevRecordLocation == null
+                        || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                      RecordLocation newRecordLocation =
+                          new RecordLocation(segment, newDocId, 
newComparisonValue, currentDistinctSegmentCount);
+                      _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                    }
+                  }
                   return currentRecordLocation;
                 }
               }
@@ -183,18 +199,28 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
                   currentSegmentName, 
getAuthoritativeUpdateOrCreationTime(segment),
                   getAuthoritativeUpdateOrCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
+                if (_context.isTableTypeInconsistentDuringConsumption() && 
currentSegment instanceof MutableSegment) {
+                  _previousKeyToRecordLocationMap.remove(primaryKey);
+                }
                 return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
               } else {
+                if (_context.isTableTypeInconsistentDuringConsumption()) {
+                  RecordLocation prevRecordLocation = 
_previousKeyToRecordLocationMap.get(primaryKey);
+                  if (currentSegment instanceof MutableSegment && 
(prevRecordLocation == null
+                      || 
newComparisonValue.compareTo(prevRecordLocation.getComparisonValue()) >= 0)) {
+                    RecordLocation newRecordLocation =
+                        new RecordLocation(segment, newDocId, 
newComparisonValue, currentDistinctSegmentCount);
+                    _previousKeyToRecordLocationMap.put(primaryKey, 
newRecordLocation);
+                  }
+                }
                 return new RecordLocation(currentSegment, currentDocId, 
currentComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentDistinctSegmentCount));
               }
             } else {
               // New primary key
               addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-              RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue, 1);
-              _newlyAddedKeys.put(primaryKey, newRecordLocation);
-              return newRecordLocation;
+              return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
             }
           });
     }
@@ -217,16 +243,21 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
     _logger.info("Removing {} segment: {}, current primary key count: {}",
         segment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, getNumPrimaryKeys());
     long startTimeMs = System.currentTimeMillis();
-
+    // For ConsistentDeletes, we need to iterate over ALL docs in the segment 
(not just valid ones)
+    // to properly decrement distinctSegmentCount for every key that was ever 
in the segment
     try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
-      removeSegment(segment,
-          UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
+      if (shouldRevertMetadataOnInconsistency(segment)) {
+        revertAndRemoveSegment(segment,
+            UpsertUtils.getRecordIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
+      } else {
+        removeSegment(segment,
+            UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
+      }
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while removing segment: %s, table: 
%s", segment.getSegmentName(),
               _tableNameWithType), e);
     }
-
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
     updatePrimaryKeyGauge(numPrimaryKeys);
@@ -234,6 +265,20 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
+    try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
+      if (shouldRevertMetadataOnInconsistency(segment)) {
+        revertAndRemoveSegment(segment, 
UpsertUtils.getRecordIterator(primaryKeyReader, validDocIds));
+      } else {
+        removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while removing segment: %s, table: 
%s, message: %s", segment.getSegmentName(),
+              _tableNameWithType, e.getMessage()), e);
+    }
+  }
+
   @Override
   public void replaceSegment(ImmutableSegment segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable 
Iterator<RecordInfo> recordInfoIterator,
@@ -258,8 +303,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
             oldSegment, validDocIdsForOldSegment);
       }
       if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-        checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment,
-            segmentName);
+        if (_context.isTableTypeInconsistentDuringConsumption()) {
+          if (shouldRevertMetadataOnInconsistency(oldSegment)) {
+            revertSegmentUpsertMetadata(oldSegment, segmentName, 
validDocIdsForOldSegment);
+            return;
+          } else {
+            logInconsistentResults(segmentName, 
validDocIdsForOldSegment.getCardinality());
+          }
+        }
       }
       // we want to always remove a segment in case of 
enableDeletedKeysCompactionConsistency = true
       // this is to account for the removal of primary-key in the 
to-be-removed segment and reduce
@@ -271,32 +322,87 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   }
 
   @Override
-  protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
-    int removedKeys = 0;
-    for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet()) 
{
-      if (entry.getValue().getSegment() == oldSegment) {
-        _primaryKeyToRecordLocationMap.remove(entry.getKey());
-        removeDocId(oldSegment, entry.getValue().getDocId());
-        removedKeys++;
-      }
+  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
+    // as the occurrence of the key in this segment is being removed. We are 
taking a set of unique primary keys
+    // to avoid double counting the same key in the same segment.
+    Set<Object> uniquePrimaryKeys = new HashSet<>();
+    while (primaryKeyIterator.hasNext()) {
+      PrimaryKey primaryKey = primaryKeyIterator.next();
+      
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
+          (pk, recordLocation) -> {
+            if (recordLocation.getSegment() == segment) {
+              if (_context.isTableTypeInconsistentDuringConsumption() && 
segment instanceof MutableSegment) {
+                _previousKeyToRecordLocationMap.remove(pk);
+              }
+              return null;
+            }
+            if (!uniquePrimaryKeys.add(pk)) {
+              return recordLocation;
+            }
+            return new RecordLocation(recordLocation.getSegment(), 
recordLocation.getDocId(),
+                recordLocation.getComparisonValue(),
+                
RecordLocation.decrementSegmentCount(recordLocation.getDistinctSegmentCount()));
+          });
     }
-    _logger.info("Removed newly added {} keys for the segment: {} out of : 
{}", removedKeys,
-        oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
   }
 
   @Override
-  protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+  protected void revertAndRemoveSegment(IndexSegment segment,
+      Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
     // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
     // as the occurrence of the key in this segment is being removed. We are 
taking a set of unique primary keys
     // to avoid double counting the same key in the same segment.
     Set<Object> uniquePrimaryKeys = new HashSet<>();
     while (primaryKeyIterator.hasNext()) {
-      PrimaryKey primaryKey = primaryKeyIterator.next();
+      Map.Entry<Integer, PrimaryKey> primaryKeyEntry = 
primaryKeyIterator.next();
+      PrimaryKey primaryKey = primaryKeyEntry.getValue();
+      int docId = primaryKeyEntry.getKey();
       
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey,
 _hashFunction),
           (pk, recordLocation) -> {
-            if (recordLocation.getSegment() == segment) {
+            RecordLocation prevLocation = 
_previousKeyToRecordLocationMap.get(pk);
+            if (prevLocation == null) {
+              _previousKeyToRecordLocationMap.remove(pk);
               return null;
             }
+            if (recordLocation.getSegment() == segment) {
+              // Revert to previous segment location
+              IndexSegment prevSegment = prevLocation.getSegment();
+              ThreadSafeMutableRoaringBitmap prevValidDocIds = 
prevSegment.getValidDocIds();
+              if (prevValidDocIds != null) {
+                try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
+                    _primaryKeyColumns, _comparisonColumns, 
_deleteRecordColumn)) {
+                  int prevDocId = prevLocation.getDocId();
+                  RecordInfo recordInfo = 
recordInfoReader.getRecordInfo(prevDocId);
+                  replaceDocId(prevSegment, prevValidDocIds, 
prevSegment.getQueryableDocIds(), segment, docId,
+                      prevDocId, recordInfo);
+                  _previousKeyToRecordLocationMap.remove(pk);
+                  if (!uniquePrimaryKeys.add(pk)) {
+                    return prevLocation;
+                  }
+                  return new RecordLocation(prevLocation.getSegment(), 
prevLocation.getDocId(),
+                      prevLocation.getComparisonValue(),
+                      
RecordLocation.decrementSegmentCount(prevLocation.getDistinctSegmentCount()));
+                } catch (IOException e) {
+                  _logger.warn("Failed to revert to previous segment: {}, 
removing key", prevSegment.getSegmentName(),
+                      e);
+                  _previousKeyToRecordLocationMap.remove(pk);
+                  return null;
+                }
+              } else {
+                _previousKeyToRecordLocationMap.remove(pk);
+                return null;
+              }
+            } else if (recordLocation.getSegment() instanceof 
ImmutableSegmentImpl) {
+              // The consuming segment's key is in a different immutable 
segment
+              _previousKeyToRecordLocationMap.remove(pk);
+            } else {
+              _logger.warn(
+                  "Consuming segment {} has already ingested the primary key 
for docId {} from segment {}, suggesting"
+                      + " that consumption is occurring concurrently with 
segment replacement, which is undesirable "
+                      + "for consistency.", 
recordLocation.getSegment().getSegmentName(), primaryKeyEntry.getKey(),
+                  segment.getSegmentName());
+            }
             if (!uniquePrimaryKeys.add(pk)) {
               return recordLocation;
             }
@@ -369,42 +475,13 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   }
 
   @Override
-  protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
-      ThreadSafeMutableRoaringBitmap queryableDocIds) {
-    _logger.info("Reverting Upsert metadata for {} keys", 
_previousKeyToRecordLocationMap.size());
-    // Revert to previous locations present in other segment
-    for (Map.Entry<Object, RecordLocation> obj : 
_previousKeyToRecordLocationMap.entrySet()) {
-      IndexSegment prevSegment = obj.getValue().getSegment();
-      RecordLocation currentLocation = 
_primaryKeyToRecordLocationMap.get(obj.getKey());
-      if (prevSegment != null) {
-        if (currentLocation != null && currentLocation.getSegment() == 
oldSegment) {
-          try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
-              _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
-            int newDocId = obj.getValue().getDocId();
-            int currentDocId = 
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
-            RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
-            // Update valid docId to the other segment location
-            replaceDocId(prevSegment, prevSegment.getValidDocIds(), 
prevSegment.getQueryableDocIds(), oldSegment,
-                currentDocId, newDocId, recordInfo);
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-          _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
-        }
-      } else {
-        _primaryKeyToRecordLocationMap.remove(obj.getKey());
-      }
-    }
-    _logger.info("Reverted Upsert metadata of {} keys to previous segment 
locations for the segment: {}",
-        _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
-    // For the newly added keys into the segment, remove the pk and valid doc 
id
-    removeNewlyAddedKeys(oldSegment);
+  protected int getPrevKeyToRecordLocationSize() {
+    return _previousKeyToRecordLocationMap.size();
   }
 
   @Override
-  protected void eraseKeyToPreviousLocationMap() {
+  protected void clearPrevKeyToRecordLocation() {
     _previousKeyToRecordLocationMap.clear();
-    _newlyAddedKeys.clear();
   }
 
   @Override
@@ -432,15 +509,16 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               int currentDocId = currentRecordLocation.getDocId();
               if (segment == currentSegment) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue,
+                return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     currentRecordLocation.getDistinctSegmentCount());
-                // Track the record location of the newly added keys
-                if (_newlyAddedKeys.containsKey(primaryKey)) {
-                  _newlyAddedKeys.put(primaryKey, newRecordLocation);
-                }
-                return newRecordLocation;
               } else {
-                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                // For consistent ParallelSegmentConsumptionPolicy like 
DISALLOW_ALWAYS and ALLOW_DURING_BUILD_ONLY,
+                // we shouldn't be seeing two mutable segments, so ideally 
current segment shouldn't point to
+                // mutable segment, unless other modes are enabled which could 
lead to inconsistent behaviour
+                if (_context.isTableTypeInconsistentDuringConsumption()
+                    && !(currentSegment instanceof MutableSegment)) {
+                  _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                }
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
@@ -461,9 +539,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
           } else {
             // New primary key
             addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-            RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue, 1);
-            _newlyAddedKeys.put(primaryKey, newRecordLocation);
-            return newRecordLocation;
+            return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
           }
         });
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 6ef76c8a66d..7bbb4d98143 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -192,6 +192,16 @@ public class UpsertContext {
     return _tableIndexDir;
   }
 
+  /**
+   * Returns true if the table configuration has settings that can lead to 
inconsistent upsert metadata
+   * during segment replacement after force commit. This happens when:
+   * - Partial upsert is enabled (records need to be merged with previous 
values)
+   * - dropOutOfOrderRecord is enabled with NONE consistency mode (records may 
have been dropped)
+   */
+  public boolean isTableTypeInconsistentDuringConsumption() {
+    return _dropOutOfOrderRecord || _outOfOrderRecordColumn != null || 
_partialUpsertHandler != null;
+  }
+
   @Override
   public String toString() {
     return new ToStringBuilder(this)
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index bedea992fd6..b508a02aeb0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -132,7 +133,6 @@ public class UpsertUtils {
       MutableRoaringBitmap validDocIds) {
     return new Iterator<>() {
       private final PeekableIntIterator _docIdIterator = 
validDocIds.getIntIterator();
-
       @Override
       public boolean hasNext() {
         return _docIdIterator.hasNext();
@@ -165,6 +165,46 @@ public class UpsertUtils {
     };
   }
 
+  public static Iterator<Map.Entry<Integer, PrimaryKey>> 
getRecordIterator(PrimaryKeyReader primaryKeyReader,
+      MutableRoaringBitmap validDocIds) {
+
+    return new Iterator<>() {
+      private final PeekableIntIterator _docIdIterator = 
validDocIds.getIntIterator();
+
+      @Override
+      public boolean hasNext() {
+        return _docIdIterator.hasNext();
+      }
+
+      @Override
+      public Map.Entry<Integer, PrimaryKey> next() {
+        int docId = _docIdIterator.next();
+        return Map.entry(docId, primaryKeyReader.getPrimaryKey(docId));
+      }
+    };
+  }
+
+  /**
+   * Returns an iterator of docId and {@link PrimaryKey} for all the documents 
from the segment.
+   */
+  public static Iterator<Map.Entry<Integer, PrimaryKey>> 
getRecordIterator(PrimaryKeyReader primaryKeyReader,
+      int numDocs) {
+    return new Iterator<>() {
+      private int _docId = 0;
+
+      @Override
+      public boolean hasNext() {
+        return _docId < numDocs;
+      }
+
+      @Override
+      public Map.Entry<Integer, PrimaryKey> next() {
+        int docId = _docId++;
+        return Map.entry(docId, primaryKeyReader.getPrimaryKey(docId));
+      }
+    };
+  }
+
   public static class RecordInfoReader implements Closeable {
     private final PrimaryKeyReader _primaryKeyReader;
     private final ComparisonColumnReader _comparisonColumnReader;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index e8ee6c588e3..80523c83f37 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1795,15 +1795,16 @@ public final class TableConfigUtils {
    * @param tableConfig the table config to check, may be null
    * @return true if the table has inconsistent state configs, false if 
tableConfig is null or no issues found
    */
-  public static boolean checkForInconsistentStateConfigs(@Nullable TableConfig 
tableConfig) {
+  public static boolean isTableTypeInconsistentDuringConsumption(@Nullable 
TableConfig tableConfig) {
+    if (tableConfig == null) {
+      return false;
+    }
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     if (upsertConfig == null) {
       return false;
     }
-    return tableConfig.getReplication() > 1 && (
-        upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL
-            || (upsertConfig.isDropOutOfOrderRecord()
-            && upsertConfig.getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE));
+    return (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL || 
upsertConfig.isDropOutOfOrderRecord()
+        || upsertConfig.getOutOfOrderRecordColumn() != null);
   }
 
   // enum of all the skip-able validation types.
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index c07d3e41cd1..8435d013978 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -178,24 +178,21 @@ public class BasePartitionUpsertMetadataManagerTest {
     List<String> segmentsTakenSnapshot = new ArrayList<>();
 
     File segDir01 = new File(TEMP_DIR, "seg01");
-    ImmutableSegmentImpl seg01 =
-        createImmutableSegment("seg01", segDir01, segmentsTakenSnapshot, new 
ArrayList<>());
+    ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, 
segmentsTakenSnapshot, new ArrayList<>());
     seg01.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3), null);
     upsertMetadataManager.addSegment(seg01);
     // seg01 has a tmp snapshot file, but no snapshot file
     FileUtils.touch(new File(segDir01, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
 
     File segDir02 = new File(TEMP_DIR, "seg02");
-    ImmutableSegmentImpl seg02 =
-        createImmutableSegment("seg02", segDir02, segmentsTakenSnapshot, new 
ArrayList<>());
+    ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, 
segmentsTakenSnapshot, new ArrayList<>());
     seg02.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3, 4, 5), 
null);
     upsertMetadataManager.addSegment(seg02);
     // seg02 has snapshot file, so its snapshot is taken first.
     FileUtils.touch(new File(segDir02, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
 
     File segDir03 = new File(TEMP_DIR, "seg03");
-    ImmutableSegmentImpl seg03 =
-        createImmutableSegment("seg03", segDir03, segmentsTakenSnapshot, new 
ArrayList<>());
+    ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, 
segmentsTakenSnapshot, new ArrayList<>());
     seg03.enableUpsert(upsertMetadataManager, createDocIds(3, 4, 7), null);
     upsertMetadataManager.addSegment(seg03);
 
@@ -369,8 +366,7 @@ public class BasePartitionUpsertMetadataManagerTest {
 
     File segDir01 = new File(TEMP_DIR, "seg01");
     ImmutableSegmentImpl seg01 =
-        createImmutableSegment("seg01", segDir01, validDocIdsSegmentsTaken,
-            queryableDocIdsSegmentsTaken);
+        createImmutableSegment("seg01", segDir01, validDocIdsSegmentsTaken, 
queryableDocIdsSegmentsTaken);
     ThreadSafeMutableRoaringBitmap queryableDocIds01 = createDocIds(0, 1, 2); 
// Some docs excluded due to deletes
     seg01.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3), 
queryableDocIds01);
     upsertMetadataManager.trackSegment(seg01);
@@ -381,8 +377,7 @@ public class BasePartitionUpsertMetadataManagerTest {
 
     File segDir02 = new File(TEMP_DIR, "seg02");
     ImmutableSegmentImpl seg02 =
-        createImmutableSegment("seg02", segDir02, validDocIdsSegmentsTaken,
-            queryableDocIdsSegmentsTaken);
+        createImmutableSegment("seg02", segDir02, validDocIdsSegmentsTaken, 
queryableDocIdsSegmentsTaken);
     ThreadSafeMutableRoaringBitmap queryableDocIds02 = createDocIds(0, 2, 3, 
5); // Some docs excluded due to deletes
     seg02.enableUpsert(upsertMetadataManager, createDocIds(0, 1, 2, 3, 4, 5), 
queryableDocIds02);
     upsertMetadataManager.trackSegment(seg02);
@@ -393,8 +388,7 @@ public class BasePartitionUpsertMetadataManagerTest {
 
     File segDir03 = new File(TEMP_DIR, "seg03");
     ImmutableSegmentImpl seg03 =
-        createImmutableSegment("seg03", segDir03, validDocIdsSegmentsTaken,
-            queryableDocIdsSegmentsTaken);
+        createImmutableSegment("seg03", segDir03, validDocIdsSegmentsTaken, 
queryableDocIdsSegmentsTaken);
     ThreadSafeMutableRoaringBitmap queryableDocIds03 = createDocIds(3, 7); // 
Some docs excluded due to deletes
     seg03.enableUpsert(upsertMetadataManager, createDocIds(3, 4, 7), 
queryableDocIds03);
     upsertMetadataManager.trackSegment(seg03);
@@ -1050,10 +1044,6 @@ public class BasePartitionUpsertMetadataManagerTest {
       return false;
     }
 
-    @Override
-    protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
-    }
-
     @Override
     protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
     }
@@ -1063,17 +1053,26 @@ public class BasePartitionUpsertMetadataManagerTest {
       return null;
     }
 
+    @Override
+    protected void revertAndRemoveSegment(IndexSegment segment,
+        Iterator<Map.Entry<Integer, PrimaryKey>> primaryKeyIterator) {
+    }
+
+    @Override
+    protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
+    }
+
     @Override
     protected void doRemoveExpiredPrimaryKeys() {
     }
 
     @Override
-    protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
-        ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    protected int getPrevKeyToRecordLocationSize() {
+      return 0;
     }
 
     @Override
-    protected void eraseKeyToPreviousLocationMap() {
+    protected void clearPrevKeyToRecordLocation() {
     }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 9e3beaf4bbb..681f7d53bc3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -66,6 +66,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -1968,7 +1969,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   @Test
-  public void testPartialUpsertOldSegmentTriggerReversion() throws IOException 
{
+  public void testPartialUpsertOldSegmentTriggerReversion()
+      throws IOException {
     // Test partial upserts with consuming (mutable) segment being sealed - 
revert should be triggered
     // Note: Revert logic only applies when sealing a consuming segment, not 
for immutable segment replacement
     PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
@@ -1999,8 +2001,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     int[] timestamps2 = new int[]{150, 350};
     ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
-    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
-        primaryKeysList2, timestamps2);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds2, null, 
primaryKeysList2, timestamps2);
 
     // Replace mutable with immutable (consuming segment seal) - revert SHOULD 
be triggered
     upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
@@ -2012,8 +2014,6 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
     checkRecordLocation(recordLocationMap, 3, segment2, 1, 350, 
HashFunction.NONE);
 
-    // Mutable segment's validDocIds should be 0 after removal
-    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
     upsertMetadataManager.stop();
     upsertMetadataManager.close();
   }
@@ -2148,7 +2148,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // Create second real segment with 2 records (subset of first)
     int[] primaryKeys2 = new int[]{10, 30};
-    int[] timestamps2 = new int[]{1500, 3500};
+    int[] timestamps2 = new int[]{1600, 3600};
     ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 = createRealSegment(segmentName, 
primaryKeys2, timestamps2, validDocIds2);
 
@@ -2156,8 +2156,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.replaceSegment(segment2, segment1);
 
     assertEquals(recordLocationMap.size(), 2);
-    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1600, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 1, 3600, 
HashFunction.NONE);
     
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
 
     upsertMetadataManager.stop();
@@ -2262,6 +2262,62 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  @Test
+  public void testProtectedModeRevertsMetadataForConsumingSegmentSeal()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+    ThreadSafeMutableRoaringBitmap validDocIdsMutable = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIdsMutable, null, mutablePrimaryKeys);
+
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+    trackedSegments.add(mutableSegment);
+
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{10, 20, 30};
+    int[] timestamps = new int[]{900, 1900, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords, 
primaryKeys);
+    ImmutableSegmentImpl immutableSegment = 
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+        primaryKeysList, timestamps);
+    trackedSegments.add(immutableSegment);
+
+    ConsumingSegmentConsistencyModeListener consistencyModeListener =
+        ConsumingSegmentConsistencyModeListener.getInstance();
+    
consistencyModeListener.setMode(ConsumingSegmentConsistencyModeListener.Mode.PROTECTED);
+    try {
+      upsertMetadataManager.replaceSegment(immutableSegment, 
validDocIdsImmutable, null,
+          getRecordInfoListWithIntegerComparison(numRecords, primaryKeys, 
timestamps, null).iterator(), mutableSegment);
+    } finally {
+      consistencyModeListener.reset();
+    }
+
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 900, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 1900, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, immutableSegment, 2, 3500, 
HashFunction.NONE);
+    
assertTrue(upsertMetadataManager._previousKeyToRecordLocationMap.isEmpty());
+    assertEquals(validDocIdsImmutable.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
   @Test
   public void testNoRevertForImmutableSegmentReplacement()
       throws IOException {
@@ -2300,12 +2356,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
         primaryKeysList2, timestamps2);
 
-    long startTime = System.currentTimeMillis();
     upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
         getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(), segment1);
-    long duration = System.currentTimeMillis() - startTime;
-
-    assertTrue(duration < 1000, "Immutable-to-immutable replacement should 
complete quickly, took: " + duration + "ms");
 
     assertEquals(recordLocationMap.size(), 1);
     checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 7010bd16032..f95f7f64d3b 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -84,7 +84,6 @@ import 
org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
 import 
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
-import 
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
 import org.apache.pinot.core.instance.context.ServerContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ListenerConfig;
@@ -132,6 +131,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import 
org.apache.pinot.spi.utils.CommonConstants.Server.SegmentCompletionProtocol;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.PinotMd5Mode;
@@ -287,8 +287,9 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     LOGGER.info("Registered ClusterConfigForTable change listener");
     // Register configuration change listener for upsert force commit/reload 
disable setting
     _clusterConfigChangeHandler.registerClusterConfigChangeListener(
-        UpsertInconsistentStateConfig.getInstance());
-    LOGGER.info("Registered UpsertInconsistentStateConfig change listener for 
dynamic force commit/reload control");
+        ConsumingSegmentConsistencyModeListener.getInstance());
+    LOGGER.info(
+        "Registered ConsumingSegmentConsistencyModeListener change listener 
for dynamic force commit/reload control");
 
     LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: 
{}, instanceId: {}", _zkAddress,
         _helixClusterName, _instanceId);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 6dee647e915..9d7464cca97 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -56,13 +56,13 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
-import 
org.apache.pinot.core.data.manager.realtime.UpsertInconsistentStateConfig;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
@@ -73,6 +73,7 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.ConsumingSegmentConsistencyModeListener;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
@@ -549,26 +550,28 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         tableNameWithType, segmentNames));
     TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
     if (tableDataManager != null) {
+      // Use cached table config for performance - properties we check 
(replication, upsert mode)
+      // rarely change after table creation, and cache is refreshed on reload
+      TableConfig tableConfig = 
tableDataManager.getCachedTableConfigAndSchema().getLeft();
+      boolean isTableTypeInconsistentDuringConsumption =
+          
TableConfigUtils.isTableTypeInconsistentDuringConsumption(tableConfig);
+      ConsumingSegmentConsistencyModeListener config = 
ConsumingSegmentConsistencyModeListener.getInstance();
+
+      // Only restrict force commit for tables with inconsistent state configs
+      // (partial upsert or dropOutOfOrderRecord=true with replication > 1)
+      // when mode is DEFAULT (isForceCommitAllowed = false)
+      if (isTableTypeInconsistentDuringConsumption && 
!config.isForceCommitAllowed()) {
+        LOGGER.warn("Force commit disabled for table: {} due to inconsistent 
state config. "
+            + "Change the config to `PROTECTED` via cluster config: {}", 
tableNameWithType, config.getConfigKey());
+        return;
+      }
+
       segmentNames.forEach(segName -> {
         SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segName);
         if (segmentDataManager != null) {
           try {
             if (segmentDataManager instanceof RealtimeSegmentDataManager) {
-              // Force-committing consuming segments is enabled by default.
-              // For partial-upsert tables or upserts with out-of-order events 
enabled (notably when replication > 1),
-              // winner selection could incorrectly favor replicas with fewer 
consumed rows.
-              // This triggered unnecessary reconsumption and resulted in 
inconsistent upsert state.
-              // The fix restores correct segment metadata before winner 
selection.
-              // Force commit behavior can be toggled dynamically using the 
cluster config
-              // `pinot.server.upsert.force.commit.reload` without restarting 
servers.
-              TableConfig tableConfig = 
tableDataManager.getCachedTableConfigAndSchema().getLeft();
-              UpsertInconsistentStateConfig config = 
UpsertInconsistentStateConfig.getInstance();
-              if (config.isForceCommitReloadAllowed(tableConfig)) {
-                ((RealtimeSegmentDataManager) 
segmentDataManager).forceCommit();
-              } else {
-                LOGGER.warn("Force commit disabled for table: {} due to 
inconsistent state config. "
-                    + "Control via cluster config: {}", tableNameWithType, 
config.getConfigKey());
-              }
+              ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
             }
           } finally {
             tableDataManager.releaseSegment(segmentDataManager);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 88b9bc30520..d9f1987fd92 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2183,15 +2183,14 @@ public class CommonConstants {
    */
   public static class ConfigChangeListenerConstants {
     /**
-     * Cluster config key to control whether force commit/reload is allowed 
for upsert tables
-     * with inconsistent state configurations (partial upsert or 
dropOutOfOrderRecord=true
-     * with consistency mode NONE and replication > 1).
-     */
-    public static final String FORCE_COMMIT_RELOAD_CONFIG = 
"pinot.server.upsert.force.commit.reload";
-
-    /**
-     * Default value: true (force commit/reload is allowed by default).
+     * Cluster config key to control how to handle inconsistency during 
consuming segment commit
+     * for upsert/dedup tables (partial upsert or dropOutOfOrderRecord=true 
with consistency mode).
+     *
+     * Supported values:
+     * - RESTRICTED: Force commit is disabled for tables with inconsistent 
state table configurations
+     * - PROTECTED: Force commit is enabled with metadata reversion on 
inconsistencies
+     * - UNSAFE: Force commit is enabled without metadata reversion (Can lead 
to inconsistencies)
      */
-    public static final boolean DEFAULT_FORCE_COMMIT_RELOAD = true;
+    public static final String CONSUMING_SEGMENT_CONSISTENCY_MODE = 
"pinot.server.consuming.segment.consistency.mode";
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
new file mode 100644
index 00000000000..869754a266c
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentConsistencyModeListener.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import 
org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Singleton class to manage the configuration for force commit on consuming 
segments
+ * for upsert tables with inconsistent state configurations (partial upsert or 
dropOutOfOrderRecord=true or
+ * outOfOrderColumn). By default, the force commit and reload is disabled
+ * This configuration is dynamically updatable via ZK cluster config without 
requiring a server restart.
+ */
+public class ConsumingSegmentConsistencyModeListener implements 
PinotClusterConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumingSegmentConsistencyModeListener.class);
+  private static final ConsumingSegmentConsistencyModeListener INSTANCE = new 
ConsumingSegmentConsistencyModeListener();
+
+  public enum Mode {
+    /**
+     * Force commit is disabled for tables with inconsistent state 
configurations.
+     * Safe option that prevents potential data inconsistency issues.
+     */
+    RESTRICTED(false),
+
+    /**
+     * Force commit is enabled but tables with partial upsert or 
dropOutOfOrderRecord=true (with replication > 1)
+     * will have their upsert metadata reverted when inconsistencies are 
detected.
+     */
+    PROTECTED(true),
+
+    /**
+     * Force commit is enabled for all tables regardless of their 
configuration.
+     * Use with caution as this may cause data inconsistency for 
partial-upsert tables
+     * or upsert tables with dropOutOfOrderRecord/outOfOrderRecordColumn 
enabled when replication > 1.
+     * Inconsistency checks and metadata revert are skipped.
+     */
+    UNSAFE(true);
+
+    public static final Mode DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE = 
RESTRICTED;
+
+    private final boolean _forceCommitAllowed;
+
+    Mode(boolean forceCommitAllowed) {
+      _forceCommitAllowed = forceCommitAllowed;
+    }
+
+    public boolean isForceCommitAllowed() {
+      return _forceCommitAllowed;
+    }
+
+    public static Mode fromString(String value) {
+      if (value == null || value.trim().isEmpty()) {
+        return DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE;
+      }
+      String normalized = value.trim().toUpperCase();
+      try {
+        return Mode.valueOf(normalized);
+      } catch (IllegalArgumentException e) {
+        return DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE;
+      }
+    }
+  }
+
+  private final AtomicReference<Mode> _consistencyMode =
+      new AtomicReference<>(Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+
+  private ConsumingSegmentConsistencyModeListener() {
+  }
+
+  public static ConsumingSegmentConsistencyModeListener getInstance() {
+    return INSTANCE;
+  }
+
+  public boolean isForceCommitAllowed() {
+    return _consistencyMode.get().isForceCommitAllowed();
+  }
+
+  public Mode getConsistencyMode() {
+    return _consistencyMode.get();
+  }
+
+  public String getConfigKey() {
+    return ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE;
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if 
(!changedConfigs.contains(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE))
 {
+      return;
+    }
+
+    String configValue = 
clusterConfigs.get(ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE);
+    Mode newMode = Mode.fromString(configValue);
+
+    Mode previousMode = _consistencyMode.getAndSet(newMode);
+    if (previousMode != newMode) {
+      LOGGER.info("Updated cluster config: {} from {} to {}",
+          ConfigChangeListenerConstants.CONSUMING_SEGMENT_CONSISTENCY_MODE, 
previousMode, newMode);
+    }
+  }
+
+  @VisibleForTesting
+  public void setMode(Mode mode) {
+    _consistencyMode.set(mode);
+  }
+
+  @VisibleForTesting
+  public void reset() {
+    _consistencyMode.set(Mode.DEFAULT_CONSUMING_SEGMENT_CONSISTENCY_MODE);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to