This is an automated email from the ASF dual-hosted git repository.

haridsv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d8cfe9714 PHOENIX-7759: Preserve buffered mutations when batch size 
limit is exceeded (#2371)
7d8cfe9714 is described below

commit 7d8cfe9714d969d8c20175ff12947a8a0764b8ce
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Mon Feb 16 11:06:19 2026 +0530

    PHOENIX-7759: Preserve buffered mutations when batch size limit is exceeded 
(#2371)
    
    - Adds new configuration property `phoenix.mutate.preserveOnLimitExceeded` 
that changes mutation limit behavior, which can be overridden at the connection 
level via connection properties.
    - When enabled, limit checks occur BEFORE state modification, preserving 
existing buffered mutations
    - Introduces `MutationLimitReachedException` (for executeUpdate) and 
`MutationLimitBatchException` (for executeBatch) to signal recoverable limit 
conditions
    - Allows applications to commit buffered mutations and continue processing 
instead of losing state, which facilitates **"dynamic sizing"** for the batch.
---
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/execute/MutationState.java  | 124 +++--
 .../phoenix/jdbc/MutationLimitBatchException.java  |  50 ++
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  21 +
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   1 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |   6 +-
 .../schema/MutationLimitReachedException.java      |  37 ++
 .../apache/phoenix/util/PhoenixKeyValueUtil.java   |  15 -
 .../apache/phoenix/end2end/MutationStateIT.java    | 526 +++++++++++++++++++++
 10 files changed, 734 insertions(+), 51 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5d9fde1659..3eddf2278f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -740,6 +740,9 @@ public enum SQLExceptionCode {
           info.getMaxPhoenixColumnSizeBytes(), 
info.getPhoenixColumnSizeBytes());
       }
     }),
+  MUTATION_LIMIT_REACHED(733, "LIM04",
+    "Mutation buffer limit reached. Existing mutations are preserved. "
+      + "Commit current mutations and retry the failed operation."),
   INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."),
   HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"),
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index fb82692be5..52517a884d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -31,10 +31,12 @@ import static 
org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILU
 import static 
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
 import static 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -99,6 +101,7 @@ import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException;
 import org.apache.phoenix.schema.MaxMutationSizeExceededException;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MutationLimitReachedException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
@@ -187,6 +190,7 @@ public class MutationState implements SQLCloseable {
 
   private final boolean indexRegionObserverEnabledAllTables;
   private final boolean serverSideImmutableIndexes;
+  private final boolean preserveOnLimitExceeded;
 
   /**
    * Return result back to client. To be used when client needs to read the 
whole row or some
@@ -268,6 +272,8 @@ public class MutationState implements SQLCloseable {
     this.serverSideImmutableIndexes = 
this.connection.getQueryServices().getConfiguration()
       .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
         DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
+    this.preserveOnLimitExceeded = 
this.connection.getQueryServices().getProps().getBoolean(
+      PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, 
DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED);
   }
 
   public MutationState(TableRef table, MultiRowMutationState mutations, long 
sizeOffset,
@@ -507,6 +513,17 @@ public class MutationState implements SQLCloseable {
     }
   }
 
+  /**
+   * Pre-check for preserve mode: throws MutationLimitReachedException if 
adding the given
+   * rows/bytes would exceed limits, which prevents the caller from modifying 
the state.
+   */
+  private void throwIfLimitWouldBeExceeded(int additionalRows, long 
additionalBytes)
+    throws SQLException {
+    if (numRows + additionalRows > maxSize || estimatedSize + additionalBytes 
> maxSizeBytes) {
+      throw new MutationLimitReachedException();
+    }
+  }
+
   public long getUpdateCount() {
     return sizeOffset + numRows;
   }
@@ -537,20 +554,25 @@ public class MutationState implements SQLCloseable {
   }
 
   private void joinMutationState(TableRef tableRef, MultiRowMutationState 
srcRows,
-    Map<TableRef, List<MultiRowMutationState>> dstMutations) {
+    Map<TableRef, List<MultiRowMutationState>> dstMutations) throws 
SQLException {
     PTable table = tableRef.getTable();
     boolean isIndex = table.getType() == PTableType.INDEX;
     boolean incrementRowCount = dstMutations == this.mutationsMap;
+    boolean impactsLimits = incrementRowCount && !isIndex;
     // we only need to check if the new mutation batch (srcRows) conflicts 
with the
     // last mutation batch since we try to merge it with that only
     MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, 
tableRef);
 
     if (existingRows == null) { // no rows found for this table
+      // For preserve mode, check limits BEFORE modifying any state
+      if (impactsLimits && preserveOnLimitExceeded) {
+        throwIfLimitWouldBeExceeded(srcRows.size(), srcRows.estimatedSize);
+      }
       // Size new map at batch size as that's what it'll likely grow to.
       MultiRowMutationState newRows = new 
MultiRowMutationState(connection.getMutateBatchSize());
       newRows.putAll(srcRows);
       addMutations(dstMutations, tableRef, newRows);
-      if (incrementRowCount && !isIndex) {
+      if (impactsLimits) {
         numRows += srcRows.size();
         // if we added all the rows from newMutationState we can just 
increment the
         // estimatedSize by newMutationState.estimatedSize
@@ -562,6 +584,7 @@ public class MutationState implements SQLCloseable {
     // for conflicting rows
     MultiRowMutationState conflictingRows =
       new MultiRowMutationState(connection.getMutateBatchSize());
+    long conflictingRowsTotalSize = 0;
 
     // Rows for this table already exist, check for conflicts
     for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
srcRows.entrySet()) {
@@ -569,29 +592,43 @@ public class MutationState implements SQLCloseable {
       RowMutationState newRowMutationState = rowEntry.getValue();
       RowMutationState existingRowMutationState = existingRows.get(key);
       if (existingRowMutationState == null) {
+        // For preserve mode, check limits BEFORE modifying any state
+        long newRowSize = newRowMutationState.calculateEstimatedSize();
+        if (impactsLimits && preserveOnLimitExceeded) {
+          throwIfLimitWouldBeExceeded(1, newRowSize);
+        }
         existingRows.put(key, newRowMutationState);
-        if (incrementRowCount && !isIndex) { // Don't count index rows in row 
count
+        if (impactsLimits) { // Don't count index rows in row count
           numRows++;
           // increment estimated size by the size of the new row
-          estimatedSize += newRowMutationState.calculateEstimatedSize();
+          estimatedSize += newRowSize;
         }
         continue;
       }
+
       Map<PColumn, byte[]> existingValues = 
existingRowMutationState.getColumnValues();
       Map<PColumn, byte[]> newValues = newRowMutationState.getColumnValues();
       if (existingValues != PRow.DELETE_MARKER && newValues != 
PRow.DELETE_MARKER) {
-        // Check if we can merge existing column values with new column values
-        long beforeMergeSize = 
existingRowMutationState.calculateEstimatedSize();
-        boolean isMerged = existingRowMutationState.join(rowEntry.getValue());
-        if (isMerged) {
-          // decrement estimated size by the size of the old row
-          estimatedSize -= beforeMergeSize;
-          // increment estimated size by the size of the new row
-          estimatedSize += existingRowMutationState.calculateEstimatedSize();
+        // Check if we can merge existing column values with new column values.
+        // For preserve mode, pass this instance so join() can check limits 
before modification.
+        Long sizeDiff =
+          existingRowMutationState.join(newRowMutationState, 
preserveOnLimitExceeded ? this : null);
+        if (sizeDiff != null) {
+          // Merged successfully (row count unchanged - same row key)
+          estimatedSize += sizeDiff;
         } else {
           // cannot merge regular upsert and conditional upsert
-          // conflicting row is not a new row so no need to increment numRows
+          // conflicting row goes into a separate batch (same key, different 
semantics)
+          // Row count unchanged (same key), but size increases
+          long conflictingRowSize = 
newRowMutationState.calculateEstimatedSize();
+          if (impactsLimits && preserveOnLimitExceeded) {
+            // Include already-accumulated conflicting rows size in the check
+            throwIfLimitWouldBeExceeded(0, conflictingRowsTotalSize + 
conflictingRowSize);
+          }
           conflictingRows.put(key, newRowMutationState);
+          if (impactsLimits) {
+            conflictingRowsTotalSize += conflictingRowSize;
+          }
         }
       } else {
         existingRows.put(key, newRowMutationState);
@@ -600,11 +637,13 @@ public class MutationState implements SQLCloseable {
 
     if (!conflictingRows.isEmpty()) {
       addMutations(dstMutations, tableRef, conflictingRows);
+      // Update estimatedSize only after actual state change
+      estimatedSize += conflictingRowsTotalSize;
     }
   }
 
   private void joinMutationState(Map<TableRef, List<MultiRowMutationState>> 
srcMutations,
-    Map<TableRef, List<MultiRowMutationState>> dstMutations) {
+    Map<TableRef, List<MultiRowMutationState>> dstMutations) throws 
SQLException {
     // Merge newMutation with this one, keeping state from newMutation for any 
overlaps
     for (Map.Entry<TableRef, List<MultiRowMutationState>> entry : 
srcMutations.entrySet()) {
       TableRef tableRef = entry.getKey();
@@ -1609,7 +1648,6 @@ public class MutationState implements SQLCloseable {
               "Sent batch of " + mutationBatch.size() + " for " + 
Bytes.toString(htableName));
           }
           child.stop();
-          child.stop();
           shouldRetry = false;
           numFailedMutations = 0;
 
@@ -2406,36 +2444,56 @@ public class MutationState implements SQLCloseable {
 
     /**
      * Join the newRow with the current row if it doesn't conflict with it. A 
regular upsert
-     * conflicts with a conditional upsert
-     * @return True if the rows were successfully joined else False
+     * conflicts with a conditional upsert.
+     * @param mutationState if non-null, checks limits before modification and 
throws
+     *                      MutationLimitReachedException if size increase 
would exceed limits
+     * @return the size change (can be 0, positive, or negative) if merged, or 
null if conflicting
      */
-    boolean join(RowMutationState newRow) {
+    Long join(RowMutationState newRow, MutationState mutationState) throws 
SQLException {
       if (isConflicting(newRow)) {
-        return false;
+        return null;
       }
-      // If we already have a row and the new row has an ON DUPLICATE KEY 
clause
-      // ignore the new values (as that's what the server will do).
+
+      // Pre-compute merged results (no side effects - these return new 
objects)
+      byte[] combinedOnDupKey =
+        PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, 
newRow.onDupKeyBytes);
+      int[] mergedIndexes = joinSortedIntArrays(statementIndexes, 
newRow.getStatementIndexes());
+
+      // Calculate column values size change
+      long colValuesSizeDiff = 0;
       if (newRow.onDupKeyBytes == null) {
-        // increment the column value size by the new row column value size
-        colValuesSize += newRow.colValuesSize;
+        colValuesSizeDiff = newRow.colValuesSize;
         for (Map.Entry<PColumn, byte[]> entry : 
newRow.columnValues.entrySet()) {
-          PColumn col = entry.getKey();
-          byte[] oldValue = columnValues.put(col, entry.getValue());
+          byte[] oldValue = columnValues.get(entry.getKey());
           if (oldValue != null) {
-            // decrement column value size by the size of all column values 
that were replaced
-            colValuesSize -= (col.getEstimatedSize() + oldValue.length);
+            colValuesSizeDiff -= entry.getKey().getEstimatedSize() + 
oldValue.length;
           }
         }
       }
-      // Concatenate ON DUPLICATE KEY bytes to allow multiple
-      // increments of the same row in the same commit batch.
-      this.onDupKeyBytes =
-        PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, 
newRow.onDupKeyBytes);
+
+      // Total size change (can be negative)
+      long totalSizeDiff = colValuesSizeDiff
+        + ((combinedOnDupKey != null ? combinedOnDupKey.length : 0)
+          - (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0))
+        + (mergedIndexes.length - statementIndexes.length) * 
SizedUtil.INT_SIZE;
+
+      // Check limit BEFORE any modification (row count unchanged for merge - 
same row key)
+      if (mutationState != null) {
+        mutationState.throwIfLimitWouldBeExceeded(0, totalSizeDiff);
+      }
+
+      // Apply modifications
+      this.onDupKeyBytes = combinedOnDupKey;
+      this.statementIndexes = mergedIndexes;
       if (newRow.onDupKeyType == OnDuplicateKeyType.UPDATE_ONLY) {
         this.onDupKeyType = OnDuplicateKeyType.UPDATE_ONLY;
       }
-      statementIndexes = joinSortedIntArrays(statementIndexes, 
newRow.getStatementIndexes());
-      return true;
+      if (newRow.onDupKeyBytes == null) {
+        columnValues.putAll(newRow.columnValues);
+        colValuesSize += colValuesSizeDiff;
+      }
+
+      return totalSizeDiff;
     }
 
     @Nonnull
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
new file mode 100644
index 0000000000..f532d109b0
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.BatchUpdateException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
+
+/**
+ * Thrown from executeBatch() when the mutation buffer limit is reached. The 
batch is automatically
+ * trimmed to contain only unprocessed items.
+ */
+public class MutationLimitBatchException extends BatchUpdateException {
+  private static final long serialVersionUID = 1L;
+
+  private final int processedCount;
+
+  /**
+   * @param updateCounts   array of update counts for each statement in the 
batch
+   * @param cause          the underlying MutationLimitReachedException
+   * @param processedCount number of statements successfully processed
+   */
+  public MutationLimitBatchException(int[] updateCounts, 
MutationLimitReachedException cause,
+    int processedCount) {
+    super(cause.getMessage(), cause.getSQLState(), cause.getErrorCode(), 
updateCounts, cause);
+    this.processedCount = processedCount;
+  }
+
+  /**
+   * Returns the number of statements that were successfully processed before 
the limit was reached.
+   * The batch has been trimmed to contain only the remaining unprocessed 
items.
+   */
+  public int getProcessedCount() {
+    return processedCount;
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 0380ae8a1c..bd0dc32565 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -204,6 +204,7 @@ import 
org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
@@ -2451,6 +2452,26 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         connection.commit();
       }
       return returnCodes;
+    } catch (MutationLimitReachedException limitEx) {
+      // Special handling: limit reached but existing mutations preserved
+      // Statements 0 through i-1 were successfully added to MutationState
+      // Statement at index i was NOT added (its join was rejected by the 
pre-check)
+
+      // If original autoCommit was true, commit what we have buffered
+      if (autoCommit) {
+        connection.commit();
+      }
+
+      // Trim the batch list to contain only unprocessed items (from index i 
to end)
+      if (i > 0) {
+        batch.subList(0, i).clear();
+      }
+
+      // Mark the failed index
+      returnCodes[i] = Statement.EXECUTE_FAILED;
+
+      // Throw MutationLimitBatchException with checkpoint information
+      throw new MutationLimitBatchException(returnCodes, limitEx, i);
     } catch (SQLException t) {
       if (i == returnCodes.length) {
         // Exception after for loop, perhaps in commit(), discard returnCodes.
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 8adc461a38..630a2d4f21 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -97,6 +97,8 @@ public interface QueryServices extends SQLCloseable {
   public static final String SCAN_CACHE_SIZE_ATTRIB = 
"hbase.client.scanner.caching";
   public static final String MAX_MUTATION_SIZE_ATTRIB = 
"phoenix.mutate.maxSize";
   public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.maxSizeBytes";
+  public static final String PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB =
+    "phoenix.mutate.preserveOnLimitExceeded";
   public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = 
"hbase.client.keyvalue.maxsize";
 
   public static final String MUTATE_BATCH_SIZE_ATTRIB = 
"phoenix.mutate.batchSize";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7fdd24fe70..082b3a3216 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -174,6 +174,7 @@ public class QueryServicesOptions {
   public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
   public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
   public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 
Mb
+  public static final boolean DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED = 
false;
   public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = 10485760; // 
10 Mb
   public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
   public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows 
may be updated
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0da85f851d..5008e28f60 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4870,9 +4870,9 @@ public class MetaDataClient {
           /**
            * To check if TTL is defined at any of the child below we are 
checking it at
            * {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, 
ColumnMutator, int, PTable, PTable, boolean)}
-           * level where in function
-           * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# 
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], 
byte[], byte[], List, int)}
-           * we are already traversing through allDescendantViews.
+           * level where in function {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+           * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, 
byte[], byte[],
+           * byte[], List, int)} we are already traversing through 
allDescendantViews.
            */
         }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
new file mode 100644
index 0000000000..d6d750d774
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+/**
+ * Exception thrown when a mutation operation would exceed the configured 
mutation buffer size
+ * limit. Unlike MaxMutationSizeBytesExceededException, existing buffered 
mutations are preserved
+ * and can be committed before retrying.
+ */
+public class MutationLimitReachedException extends SQLException {
+  private static final long serialVersionUID = 1L;
+  private static final SQLExceptionCode CODE = 
SQLExceptionCode.MUTATION_LIMIT_REACHED;
+
+  public MutationLimitReachedException() {
+    super(new SQLExceptionInfo.Builder(CODE).build().toString(), 
CODE.getSQLState(),
+      CODE.getErrorCode());
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index 4f4bbffa5e..a2070712e0 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -173,21 +173,6 @@ public class PhoenixKeyValueUtil {
     return size;
   }
 
-  /**
-   * Estimates the storage size of a row
-   * @param tableMutationMap map from table to row to RowMutationState
-   * @return estimated row size
-   */
-  public static long
-    getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState> 
tableMutationMap) {
-    long size = 0;
-    // iterate over table
-    for (Entry<TableRef, MultiRowMutationState> tableEntry : 
tableMutationMap.entrySet()) {
-      size += calculateMultiRowMutationSize(tableEntry.getValue());
-    }
-    return size;
-  }
-
   public static long getEstimatedRowMutationSizeWithBatch(
     Map<TableRef, List<MultiRowMutationState>> tableMutationMap) {
     long size = 0;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index c654601a4a..0b470acd14 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -45,8 +45,11 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.MutationLimitBatchException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.MaxMutationSizeExceededException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.Repeat;
@@ -81,6 +84,31 @@ public class MutationStateIT extends ParallelStatsDisabledIT 
{
     }
   }
 
+  /**
+   * Helper to create Properties for preserve mode tests.
+   * @param maxRows  max mutation row count limit
+   * @param maxBytes max mutation byte size limit (0 to skip setting)
+   */
+  private Properties createPreserveModeProps(int maxRows, long maxBytes) {
+    Properties props = new Properties();
+    props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, 
String.valueOf(maxRows));
+    if (maxBytes > 0) {
+      props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, 
String.valueOf(maxBytes));
+    }
+    
props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, 
"true");
+    return props;
+  }
+
+  /**
+   * Helper to verify the row count in a table.
+   */
+  private void verifyRowCount(Connection conn, String tableName, int expected) 
throws SQLException {
+    try (ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName)) {
+      assertTrue("Should have results", rs.next());
+      assertEquals("Row count mismatch", expected, rs.getInt(1));
+    }
+  }
+
   public static String randString(int length) {
     return new BigInteger(164, RAND).toString().substring(0, length);
   }
@@ -313,6 +341,475 @@ public class MutationStateIT extends 
ParallelStatsDisabledIT {
     }
   }
 
+  /**
+   * Tests that when preserveOnLimitExceeded=true, executeUpdate() throws
+   * MutationLimitReachedException without clearing buffered mutations, 
allowing the client to
+   * commit and retry.
+   */
+  @Test
+  public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws 
Exception {
+    String fullTableName = generateUniqueName();
+    int maxRows = 5;
+
+    // Create table
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(maxRows, 0))) {
+      conn.setAutoCommit(false);
+
+      PreparedStatement stmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?,?,?)");
+
+      int totalRows = 12;
+      int processed = 0;
+      int commitCount = 0;
+
+      while (processed < totalRows) {
+        try {
+          for (int i = processed; i < totalRows; i++) {
+            stmt.setString(1, "ORG" + String.format("%011d", i));
+            stmt.setString(2, "ENT" + String.format("%011d", i));
+            stmt.setInt(3, i);
+            stmt.executeUpdate();
+            processed = i + 1;
+          }
+          conn.commit();
+          commitCount++;
+          break; // All done
+        } catch (MutationLimitReachedException e) {
+          // Verify the exception is the expected type
+          assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(), 
e.getErrorCode());
+
+          // Verify mutations were preserved - MutationState should have rows
+          MutationState state = conn.getMutationState();
+          assertTrue("Mutations should be preserved", state.getNumRows() > 0);
+
+          // Commit what we have so far
+          conn.commit();
+          commitCount++;
+          // Loop continues from 'processed' index
+        }
+      }
+
+      // Should have required multiple commits due to limit
+      assertTrue("Should have committed multiple times", commitCount == 3);
+      verifyRowCount(conn, fullTableName, totalRows);
+    }
+  }
+
+  /**
+   * Tests that when preserveOnLimitExceeded=true, executeBatch() throws 
MutationLimitBatchException
+   * with proper checkpoint info, allowing recovery.
+   */
+  @Test
+  public void testExecuteBatchPreserveOnLimitExceeded() throws Exception {
+    String fullTableName = generateUniqueName();
+    int maxRows = 5;
+
+    // Create table
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(maxRows, 0))) {
+      conn.setAutoCommit(false);
+
+      PreparedStatement stmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?,?,?)");
+
+      int totalRows = 15;
+      int commitCount = 0;
+
+      // Add all rows to batch
+      for (int i = 0; i < totalRows; i++) {
+        stmt.setString(1, "ORG" + String.format("%011d", i));
+        stmt.setString(2, "ENT" + String.format("%011d", i));
+        stmt.setInt(3, i);
+        stmt.addBatch();
+      }
+
+      // Execute batch - should hit limit and throw MutationLimitBatchException
+      while (true) {
+        try {
+          stmt.executeBatch();
+          conn.commit();
+          commitCount++;
+          break; // All done
+        } catch (MutationLimitBatchException e) {
+          // Verify we got processedCount
+          assertTrue("ProcessedCount should be > 0", e.getProcessedCount() > 
0);
+
+          // Commit what was successfully buffered
+          conn.commit();
+          commitCount++;
+          // executeBatch() trims the batch, so just retry
+        }
+      }
+
+      // Should have required multiple commits
+      assertTrue("Should have committed multiple times", commitCount == 3);
+      verifyRowCount(conn, fullTableName, totalRows);
+    }
+  }
+
+  /**
+   * Tests executeBatch() with autoCommit=true and 
preserveOnLimitExceeded=true. In this case,
+   * executeBatch() should auto-commit on limit and trim batch.
+   */
+  @Test
+  public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws 
Exception {
+    String fullTableName = generateUniqueName();
+    int maxRows = 5;
+
+    // Create table
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(maxRows, 0))) {
+      conn.setAutoCommit(true); // autoCommit is true
+
+      PreparedStatement stmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?,?,?)");
+
+      int totalRows = 15;
+      int exceptionCount = 0;
+
+      // Add all rows to batch
+      for (int i = 0; i < totalRows; i++) {
+        stmt.setString(1, "ORG" + String.format("%011d", i));
+        stmt.setString(2, "ENT" + String.format("%011d", i));
+        stmt.setInt(3, i);
+        stmt.addBatch();
+      }
+
+      // Execute batch - with autoCommit=true, it should auto-commit and trim
+      while (true) {
+        try {
+          stmt.executeBatch();
+          break; // All done
+        } catch (MutationLimitBatchException e) {
+          exceptionCount++;
+          // With autoCommit=true, mutations were already committed
+          // Batch is trimmed, just retry
+        }
+      }
+
+      // Should have hit limit at least once
+      assertTrue("Should have hit limit at least once", exceptionCount > 0);
+      verifyRowCount(conn, fullTableName, totalRows);
+    }
+  }
+
+  /**
+   * Tests that when preserveOnLimitExceeded=false (default), the old behavior 
is maintained -
+   * mutations are cleared on limit exceeded.
+   */
+  @Test
+  public void testExecuteUpdateDefaultBehaviorClearsMutations() throws 
Exception {
+    String fullTableName = generateUniqueName();
+    int maxRows = 5;
+
+    // Create table
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    // Test with preserveOnLimitExceeded=false (default)
+    Properties props = new Properties();
+    props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, 
String.valueOf(maxRows));
+    // Not setting PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, should default 
to false
+
+    try (
+      PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props)) {
+      conn.setAutoCommit(false);
+
+      PreparedStatement stmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?,?,?)");
+
+      try {
+        for (int i = 0; i < 20; i++) {
+          stmt.setString(1, "ORG" + String.format("%011d", i));
+          stmt.setString(2, "ENT" + String.format("%011d", i));
+          stmt.setInt(3, i);
+          stmt.executeUpdate();
+        }
+        fail("Should have thrown MaxMutationSizeExceededException");
+      } catch (SQLException e) {
+        // Should be the old exception type, not MutationLimitReachedException
+        
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), 
e.getErrorCode());
+
+        // Verify mutations were cleared (old behavior)
+        MutationState state = conn.getMutationState();
+        assertEquals("Mutations should have been cleared", 0, 
state.getNumRows());
+      }
+      verifyRowCount(conn, fullTableName, 0);
+    }
+  }
+
+  /**
+   * Tests byte size limit with preserveOnLimitExceeded=true.
+   */
+  @Test
+  public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception {
+    String fullTableName = generateUniqueName();
+
+    // Create table with a VARCHAR column for large values
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    // High row limit, low byte limit (~3-4 rows at ~900+ bytes each)
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(1000000, 3000))) {
+      conn.setAutoCommit(false);
+
+      PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName
+        + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+      int totalRows = 10;
+      int processed = 0;
+      int commitCount = 0;
+      String largeValue = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; // ~36 bytes 
per row in tags
+
+      while (processed < totalRows) {
+        try {
+          for (int i = processed; i < totalRows; i++) {
+            stmt.setString(1, "ORG" + String.format("%011d", i));
+            stmt.setString(2, "ENT" + String.format("%011d", i));
+            stmt.setInt(3, i);
+            stmt.setString(4, largeValue + i);
+            stmt.executeUpdate();
+            processed = i + 1;
+          }
+          conn.commit();
+          commitCount++;
+          break;
+        } catch (MutationLimitReachedException e) {
+          assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(), 
e.getErrorCode());
+          conn.commit();
+          commitCount++;
+        }
+      }
+
+      // Should have required multiple commits due to byte limit
+      assertTrue("Should have committed multiple times due to byte limit", 
commitCount > 1);
+      verifyRowCount(conn, fullTableName, totalRows);
+    }
+  }
+
+  /**
+   * Tests that when preserveOnLimitExceeded is true but a single mutation 
exceeds the limit
+   * (constructor case), we still get the old exception type since there's no 
existing state to
+   * preserve.
+   */
+  @Test
+  public void testSingleMutationExceedsLimitWithPreserveOption() throws 
Exception {
+    String fullTableName = generateUniqueName();
+    String sourceTable = generateUniqueName();
+
+    // Use a connection without limits to create tables and set up test data
+    try (Connection setupConn = DriverManager.getConnection(getUrl())) {
+      setupConn.setAutoCommit(true);
+      try (Statement stmt = setupConn.createStatement()) {
+        stmt.execute("CREATE TABLE " + fullTableName + DDL);
+        stmt.execute("CREATE TABLE " + sourceTable + " (id VARCHAR PRIMARY 
KEY, val INTEGER)");
+        // Insert multiple rows into source table
+        stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('a', 1)");
+        stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('b', 2)");
+      }
+    }
+
+    // Now use a connection with the limit set
+    try (PhoenixConnection conn =
+      (PhoenixConnection) DriverManager.getConnection(getUrl(), 
createPreserveModeProps(1, 0))) {
+      conn.setAutoCommit(false);
+
+      // Insert a row to have some existing state in the connection's 
MutationState
+      try (PreparedStatement stmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?, ?, ?)")) {
+        stmt.setString(1, "org1");
+        stmt.setString(2, "entity1");
+        stmt.setInt(3, 1);
+        stmt.executeUpdate();
+      }
+
+      // Now do a SELECT-based UPSERT that produces multiple rows in a single 
mutation.
+      // This creates a new MutationState in the constructor that already 
exceeds the limit.
+      // Since there's no state to preserve in that NEW MutationState (only 1 
row was inserted
+      // into the connection's existing state), we should get the old 
exception type
+      // (MaxMutationSizeExceededException), not MutationLimitReachedException.
+      try (Statement stmt = conn.createStatement()) {
+        try {
+          stmt.executeUpdate("UPSERT INTO " + fullTableName
+            + " (organization_id, entity_id, score) " + "SELECT id, 'e', val 
FROM " + sourceTable);
+          fail("Expected MaxMutationSizeExceededException");
+        } catch (MaxMutationSizeExceededException e) {
+          // Expected - legacy exception even though preserveOnLimitExceeded 
is true,
+          // because the single mutation (from constructor) has no prior state 
to preserve
+        } catch (MutationLimitReachedException e) {
+          fail(
+            "Should have thrown MaxMutationSizeExceededException, not 
MutationLimitReachedException. "
+              + "When a single mutation exceeds the limit, there's no state to 
preserve.");
+        }
+      }
+      MutationState state = conn.getMutationState();
+      assertEquals("Mutation should not have been cleared", 1, 
state.getNumRows());
+    }
+  }
+
+  /**
+   * Tests that when preserveOnLimitExceeded=true and the limit is reached 
during row merge (same
+   * row key updated multiple times), the mutations are preserved. This 
specifically tests the 3rd
+   * case in joinMutationState where rows are merged.
+   */
+  @Test
+  public void testRowMergePreserveOnLimitExceeded() throws Exception {
+    String fullTableName = generateUniqueName();
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    // Low byte limit so merging rows with increasing sizes triggers limit
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(1000, 1500))) {
+      conn.setAutoCommit(false);
+
+      PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName
+        + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+      int commitCount = 0;
+      int totalUpdates = 0;
+      String baseKey = "ORG000000000001";
+      String entityKey = "ENT000000000001";
+      int maxScore = 50;
+
+      // Keep updating the SAME row with larger values until limit reached
+      // Each update merges with the existing row, increasing the size
+      while (totalUpdates <= maxScore) {
+        try {
+          for (int i = totalUpdates; i <= maxScore; i++) {
+            stmt.setString(1, baseKey);
+            stmt.setString(2, entityKey);
+            stmt.setInt(3, i);
+            // Increasing size with each update - this will eventually exceed 
byte limit
+            StringBuilder sb = new StringBuilder("update" + i + "_");
+            for (int j = 0; j < i * 10; j++) {
+              sb.append("X");
+            }
+            stmt.setString(4, sb.toString());
+            stmt.executeUpdate();
+            totalUpdates = i + 1;
+          }
+          conn.commit();
+          commitCount++;
+          break;
+        } catch (MutationLimitReachedException e) {
+          // Mutations should be preserved - verify we have state
+          MutationState state = conn.getMutationState();
+          assertTrue("Mutations should be preserved on limit exceeded",
+            state.getNumRows() > 0 || state.getEstimatedSize() > 0);
+          conn.commit();
+          commitCount++;
+        }
+      }
+
+      // Should have hit limit and committed multiple times due to row merge 
size growth
+      assertTrue("Should have committed multiple times", commitCount > 1);
+
+      // Verify the row exists with the latest value
+      try (ResultSet rs = conn.createStatement().executeQuery("SELECT score 
FROM " + fullTableName
+        + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" + 
entityKey + "'")) {
+        assertTrue("Row should exist", rs.next());
+        // Score should be the last successfully committed value
+        assertEquals("Score should be set to the max value", maxScore, 
rs.getInt(1));
+      }
+    }
+  }
+
+  /**
+   * Tests that when preserveOnLimitExceeded=true and the limit is reached 
during conflicting row
+   * addition (regular upsert vs ON DUPLICATE KEY upsert on same key), the 
mutations are preserved.
+   * This tests the conflict case in joinMutationState.
+   */
+  @Test
+  public void testConflictingRowsPreserveOnLimitExceeded() throws Exception {
+    String fullTableName = generateUniqueName();
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+    }
+
+    // Low byte limit so adding conflicting rows triggers limit
+    try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(),
+      createPreserveModeProps(1000, 2000))) {
+      conn.setAutoCommit(false);
+
+      // Regular UPSERT statement
+      PreparedStatement regularStmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName
+        + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+      // ON DUPLICATE KEY UPSERT statement - conflicts with regular upsert
+      PreparedStatement onDupKeyStmt = conn.prepareStatement(
+        "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) 
VALUES (?,?,?)"
+          + " ON DUPLICATE KEY UPDATE score = score + 1");
+
+      int commitCount = 0;
+      int totalRows = 0;
+      String orgKey = "ORG000000000001";
+
+      // Alternate between regular upserts and ON DUPLICATE KEY upserts on 
same keys
+      // This creates conflicting rows that can't be merged
+      while (totalRows < 30) {
+        try {
+          for (int i = totalRows; i < 30; i++) {
+            String entityKey = String.format("ENT%012d", i);
+
+            // First do a regular upsert
+            regularStmt.setString(1, orgKey);
+            regularStmt.setString(2, entityKey);
+            regularStmt.setInt(3, i);
+            StringBuilder sb = new StringBuilder("data" + i + "_");
+            for (int j = 0; j < 20; j++) {
+              sb.append("X");
+            }
+            regularStmt.setString(4, sb.toString());
+            regularStmt.executeUpdate();
+
+            // Then do an ON DUPLICATE KEY upsert on the same key - this 
conflicts
+            onDupKeyStmt.setString(1, orgKey);
+            onDupKeyStmt.setString(2, entityKey);
+            onDupKeyStmt.setInt(3, i * 10);
+            onDupKeyStmt.executeUpdate();
+
+            totalRows = i + 1;
+          }
+          conn.commit();
+          commitCount++;
+          break;
+        } catch (MutationLimitReachedException e) {
+          // Mutations should be preserved - verify we have state
+          MutationState state = conn.getMutationState();
+          assertTrue("Mutations should be preserved on limit exceeded",
+            state.getNumRows() > 0 || state.getEstimatedSize() > 0);
+          conn.commit();
+          commitCount++;
+        }
+      }
+
+      // Should have hit limit and committed multiple times due to conflicting 
row size
+      assertTrue("Should have committed multiple times", commitCount > 1);
+      // Verify rows exist
+      verifyRowCount(conn, fullTableName, totalRows);
+    }
+  }
+
   @Test
   public void testMutationEstimatedSize() throws Exception {
     PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl());
@@ -380,6 +877,35 @@ public class MutationStateIT extends 
ParallelStatsDisabledIT {
     stmt.execute();
     assertTrue("Mutation state size should decrease",
       prevEstimatedSize + 4 > state.getEstimatedSize());
+
+    // Test that estimatedSize increases when adding conflicting rows
+    // (regular upsert + ON DUPLICATE KEY upsert on same key cannot be merged)
+    conn.commit(); // clear state
+    assertEquals("Mutation state size should be zero after commit", 0, 
state.getEstimatedSize());
+
+    // Regular upsert
+    stmt = conn.prepareStatement(
+      "upsert into " + fullTableName + " (organization_id, entity_id, score) 
values (?,?,?)");
+    stmt.setString(1, "AAAA");
+    stmt.setString(2, "BBBB");
+    stmt.setInt(3, 1);
+    stmt.execute();
+    long sizeAfterRegularUpsert = state.getEstimatedSize();
+    assertTrue("Mutation state size should be > 0 after regular upsert",
+      sizeAfterRegularUpsert > 0);
+
+    // ON DUPLICATE KEY upsert on same key - creates a conflicting row that 
can't be merged
+    PreparedStatement onDupStmt = conn.prepareStatement(
+      "upsert into " + fullTableName + " (organization_id, entity_id, score) 
values (?,?,?)"
+        + " ON DUPLICATE KEY UPDATE score = score + 1");
+    onDupStmt.setString(1, "AAAA");
+    onDupStmt.setString(2, "BBBB");
+    onDupStmt.setInt(3, 2);
+    onDupStmt.execute();
+
+    // Size should increase because conflicting row goes into a separate batch
+    assertTrue("Estimated size should increase for conflicting row",
+      state.getEstimatedSize() > sizeAfterRegularUpsert);
   }
 
   @Test

Reply via email to