This is an automated email from the ASF dual-hosted git repository.
haridsv pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 4d3077d4c7 PHOENIX-7759: Preserve buffered mutations when batch size
limit is exceeded (#2371) (#2374)
4d3077d4c7 is described below
commit 4d3077d4c705e8181f1255bb19cb98600cd1e031
Author: Hari Krishna Dara <[email protected]>
AuthorDate: Tue Feb 17 09:29:59 2026 +0530
PHOENIX-7759: Preserve buffered mutations when batch size limit is exceeded
(#2371) (#2374)
- Adds new configuration property `phoenix.mutate.preserveOnLimitExceeded`
that changes mutation limit behavior, which can be overridden at the connection
level via connection properties.
- When enabled, limit checks occur BEFORE state modification, preserving
existing buffered mutations
- Introduces `MutationLimitReachedException` (for executeUpdate) and
`MutationLimitBatchException` (for executeBatch) to signal recoverable limit
conditions
- Allows applications to commit buffered mutations and continue processing
instead of losing state, which facilitates **"dynamic sizing"** for the batch.
---
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../org/apache/phoenix/execute/MutationState.java | 127 +++--
.../phoenix/jdbc/MutationLimitBatchException.java | 50 ++
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 21 +
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 1 +
.../schema/MutationLimitReachedException.java | 37 ++
.../apache/phoenix/util/PhoenixKeyValueUtil.java | 15 -
.../apache/phoenix/end2end/MutationStateIT.java | 526 +++++++++++++++++++++
9 files changed, 734 insertions(+), 48 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ecf4e7ebac..60c034c44e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -691,6 +691,9 @@ public enum SQLExceptionCode {
info.getMaxPhoenixColumnSizeBytes(),
info.getPhoenixColumnSizeBytes());
}
}),
+ MUTATION_LIMIT_REACHED(733, "LIM04",
+ "Mutation buffer limit reached. Existing mutations are preserved. "
+ + "Commit current mutations and retry the failed operation."),
INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."),
HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"),
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index f99f6ce74e..6db8d3533a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -34,6 +34,11 @@ import static
org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABL
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES;
+import static
org.apache.phoenix.query.QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -95,6 +100,7 @@ import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException;
import org.apache.phoenix.schema.MaxMutationSizeExceededException;
import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MutationLimitReachedException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
@@ -178,6 +184,7 @@ public class MutationState implements SQLCloseable {
private static boolean allDeletesMutations = true;
private final boolean indexRegionObserverEnabledAllTables;
+ private final boolean preserveOnLimitExceeded;
public static void resetAllMutationState() {
allDeletesMutations = true;
@@ -245,6 +252,8 @@ public class MutationState implements SQLCloseable {
this.indexRegionObserverEnabledAllTables =
Boolean.parseBoolean(this.connection
.getQueryServices().getConfiguration().get(INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB,
DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES));
+ this.preserveOnLimitExceeded =
this.connection.getQueryServices().getProps().getBoolean(
+ PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB,
DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED);
}
public MutationState(TableRef table, MultiRowMutationState mutations, long
sizeOffset,
@@ -484,6 +493,17 @@ public class MutationState implements SQLCloseable {
}
}
+ /**
+ * Pre-check for preserve mode: throws MutationLimitReachedException if
adding the given
+ * rows/bytes would exceed limits, which prevents the caller from modifying
the state.
+ */
+ private void throwIfLimitWouldBeExceeded(int additionalRows, long
additionalBytes)
+ throws SQLException {
+ if (numRows + additionalRows > maxSize || estimatedSize + additionalBytes
> maxSizeBytes) {
+ throw new MutationLimitReachedException();
+ }
+ }
+
public long getUpdateCount() {
return sizeOffset + numRows;
}
@@ -506,20 +526,25 @@ public class MutationState implements SQLCloseable {
}
private void joinMutationState(TableRef tableRef, MultiRowMutationState
srcRows,
- Map<TableRef, List<MultiRowMutationState>> dstMutations) {
+ Map<TableRef, List<MultiRowMutationState>> dstMutations) throws
SQLException {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
boolean incrementRowCount = dstMutations == this.mutationsMap;
+ boolean impactsLimits = incrementRowCount && !isIndex;
// we only need to check if the new mutation batch (srcRows) conflicts
with the
// last mutation batch since we try to merge it with that only
MultiRowMutationState existingRows = getLastMutationBatch(dstMutations,
tableRef);
if (existingRows == null) { // no rows found for this table
+ // For preserve mode, check limits BEFORE modifying any state
+ if (impactsLimits && preserveOnLimitExceeded) {
+ throwIfLimitWouldBeExceeded(srcRows.size(), srcRows.estimatedSize);
+ }
// Size new map at batch size as that's what it'll likely grow to.
MultiRowMutationState newRows = new
MultiRowMutationState(connection.getMutateBatchSize());
newRows.putAll(srcRows);
addMutations(dstMutations, tableRef, newRows);
- if (incrementRowCount && !isIndex) {
+ if (impactsLimits) {
numRows += srcRows.size();
// if we added all the rows from newMutationState we can just
increment the
// estimatedSize by newMutationState.estimatedSize
@@ -531,6 +556,7 @@ public class MutationState implements SQLCloseable {
// for conflicting rows
MultiRowMutationState conflictingRows =
new MultiRowMutationState(connection.getMutateBatchSize());
+ long conflictingRowsTotalSize = 0;
// Rows for this table already exist, check for conflicts
for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry :
srcRows.entrySet()) {
@@ -538,29 +564,43 @@ public class MutationState implements SQLCloseable {
RowMutationState newRowMutationState = rowEntry.getValue();
RowMutationState existingRowMutationState = existingRows.get(key);
if (existingRowMutationState == null) {
+ // For preserve mode, check limits BEFORE modifying any state
+ long newRowSize = newRowMutationState.calculateEstimatedSize();
+ if (impactsLimits && preserveOnLimitExceeded) {
+ throwIfLimitWouldBeExceeded(1, newRowSize);
+ }
existingRows.put(key, newRowMutationState);
- if (incrementRowCount && !isIndex) { // Don't count index rows in row
count
+ if (impactsLimits) { // Don't count index rows in row count
numRows++;
// increment estimated size by the size of the new row
- estimatedSize += newRowMutationState.calculateEstimatedSize();
+ estimatedSize += newRowSize;
}
continue;
}
+
Map<PColumn, byte[]> existingValues =
existingRowMutationState.getColumnValues();
Map<PColumn, byte[]> newValues = newRowMutationState.getColumnValues();
if (existingValues != PRow.DELETE_MARKER && newValues !=
PRow.DELETE_MARKER) {
- // Check if we can merge existing column values with new column values
- long beforeMergeSize =
existingRowMutationState.calculateEstimatedSize();
- boolean isMerged = existingRowMutationState.join(rowEntry.getValue());
- if (isMerged) {
- // decrement estimated size by the size of the old row
- estimatedSize -= beforeMergeSize;
- // increment estimated size by the size of the new row
- estimatedSize += existingRowMutationState.calculateEstimatedSize();
+ // Check if we can merge existing column values with new column values.
+ // For preserve mode, pass this instance so join() can check limits
before modification.
+ Long sizeDiff =
+ existingRowMutationState.join(newRowMutationState,
preserveOnLimitExceeded ? this : null);
+ if (sizeDiff != null) {
+ // Merged successfully (row count unchanged - same row key)
+ estimatedSize += sizeDiff;
} else {
// cannot merge regular upsert and conditional upsert
- // conflicting row is not a new row so no need to increment numRows
+ // conflicting row goes into a separate batch (same key, different
semantics)
+ // Row count unchanged (same key), but size increases
+ long conflictingRowSize =
newRowMutationState.calculateEstimatedSize();
+ if (impactsLimits && preserveOnLimitExceeded) {
+ // Include already-accumulated conflicting rows size in the check
+ throwIfLimitWouldBeExceeded(0, conflictingRowsTotalSize +
conflictingRowSize);
+ }
conflictingRows.put(key, newRowMutationState);
+ if (impactsLimits) {
+ conflictingRowsTotalSize += conflictingRowSize;
+ }
}
} else {
existingRows.put(key, newRowMutationState);
@@ -569,11 +609,13 @@ public class MutationState implements SQLCloseable {
if (!conflictingRows.isEmpty()) {
addMutations(dstMutations, tableRef, conflictingRows);
+ // Update estimatedSize only after actual state change
+ estimatedSize += conflictingRowsTotalSize;
}
}
private void joinMutationState(Map<TableRef, List<MultiRowMutationState>>
srcMutations,
- Map<TableRef, List<MultiRowMutationState>> dstMutations) {
+ Map<TableRef, List<MultiRowMutationState>> dstMutations) throws
SQLException {
// Merge newMutation with this one, keeping state from newMutation for any
overlaps
for (Map.Entry<TableRef, List<MultiRowMutationState>> entry :
srcMutations.entrySet()) {
TableRef tableRef = entry.getKey();
@@ -1507,7 +1549,6 @@ public class MutationState implements SQLCloseable {
"Sent batch of " + mutationBatch.size() + " for " +
Bytes.toString(htableName));
}
child.stop();
- child.stop();
shouldRetry = false;
numFailedMutations = 0;
@@ -2286,33 +2327,53 @@ public class MutationState implements SQLCloseable {
/**
* Join the newRow with the current row if it doesn't conflict with it. A
regular upsert
- * conflicts with a conditional upsert
- * @return True if the rows were successfully joined else False
+ * conflicts with a conditional upsert.
+ * @param mutationState if non-null, checks limits before modification and
throws
+ * MutationLimitReachedException if size increase
would exceed limits
+ * @return the size change (can be 0, positive, or negative) if merged, or
null if conflicting
*/
- boolean join(RowMutationState newRow) {
+ Long join(RowMutationState newRow, MutationState mutationState) throws
SQLException {
if (isConflicting(newRow)) {
- return false;
+ return null;
}
- // If we already have a row and the new row has an ON DUPLICATE KEY
clause
- // ignore the new values (as that's what the server will do).
+
+ // Pre-compute merged results (no side effects - these return new
objects)
+ byte[] combinedOnDupKey =
+ PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes,
newRow.onDupKeyBytes);
+ int[] mergedIndexes = joinSortedIntArrays(statementIndexes,
newRow.getStatementIndexes());
+
+ // Calculate column values size change
+ long colValuesSizeDiff = 0;
if (newRow.onDupKeyBytes == null) {
- // increment the column value size by the new row column value size
- colValuesSize += newRow.colValuesSize;
+ colValuesSizeDiff = newRow.colValuesSize;
for (Map.Entry<PColumn, byte[]> entry :
newRow.columnValues.entrySet()) {
- PColumn col = entry.getKey();
- byte[] oldValue = columnValues.put(col, entry.getValue());
+ byte[] oldValue = columnValues.get(entry.getKey());
if (oldValue != null) {
- // decrement column value size by the size of all column values
that were replaced
- colValuesSize -= (col.getEstimatedSize() + oldValue.length);
+ colValuesSizeDiff -= entry.getKey().getEstimatedSize() +
oldValue.length;
}
}
}
- // Concatenate ON DUPLICATE KEY bytes to allow multiple
- // increments of the same row in the same commit batch.
- this.onDupKeyBytes =
- PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes,
newRow.onDupKeyBytes);
- statementIndexes = joinSortedIntArrays(statementIndexes,
newRow.getStatementIndexes());
- return true;
+
+ // Total size change (can be negative)
+ long totalSizeDiff = colValuesSizeDiff
+ + ((combinedOnDupKey != null ? combinedOnDupKey.length : 0)
+ - (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0))
+ + (mergedIndexes.length - statementIndexes.length) *
SizedUtil.INT_SIZE;
+
+ // Check limit BEFORE any modification (row count unchanged for merge -
same row key)
+ if (mutationState != null) {
+ mutationState.throwIfLimitWouldBeExceeded(0, totalSizeDiff);
+ }
+
+ // Apply modifications
+ this.onDupKeyBytes = combinedOnDupKey;
+ this.statementIndexes = mergedIndexes;
+ if (newRow.onDupKeyBytes == null) {
+ columnValues.putAll(newRow.columnValues);
+ colValuesSize += colValuesSizeDiff;
+ }
+
+ return totalSizeDiff;
}
@Nonnull
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
new file mode 100644
index 0000000000..f532d109b0
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.BatchUpdateException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
+
+/**
+ * Thrown from executeBatch() when the mutation buffer limit is reached. The
batch is automatically
+ * trimmed to contain only unprocessed items.
+ */
+public class MutationLimitBatchException extends BatchUpdateException {
+ private static final long serialVersionUID = 1L;
+
+ private final int processedCount;
+
+ /**
+ * @param updateCounts array of update counts for each statement in the
batch
+ * @param cause the underlying MutationLimitReachedException
+ * @param processedCount number of statements successfully processed
+ */
+ public MutationLimitBatchException(int[] updateCounts,
MutationLimitReachedException cause,
+ int processedCount) {
+ super(cause.getMessage(), cause.getSQLState(), cause.getErrorCode(),
updateCounts, cause);
+ this.processedCount = processedCount;
+ }
+
+ /**
+ * Returns the number of statements that were successfully processed before
the limit was reached.
+ * The batch has been trimmed to contain only the remaining unprocessed
items.
+ */
+ public int getProcessedCount() {
+ return processedCount;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 45610c5fa4..424c7bae16 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -194,6 +194,7 @@ import
org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PIndexState;
@@ -2170,6 +2171,26 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
connection.commit();
}
return returnCodes;
+ } catch (MutationLimitReachedException limitEx) {
+ // Special handling: limit reached but existing mutations preserved
+ // Statements 0 through i-1 were successfully added to MutationState
+ // Statement at index i was NOT added (its join was rejected by the
pre-check)
+
+ // If original autoCommit was true, commit what we have buffered
+ if (autoCommit) {
+ connection.commit();
+ }
+
+ // Trim the batch list to contain only unprocessed items (from index i
to end)
+ if (i > 0) {
+ batch.subList(0, i).clear();
+ }
+
+ // Mark the failed index
+ returnCodes[i] = Statement.EXECUTE_FAILED;
+
+ // Throw MutationLimitBatchException with checkpoint information
+ throw new MutationLimitBatchException(returnCodes, limitEx, i);
} catch (SQLException t) {
if (i == returnCodes.length) {
// Exception after for loop, perhaps in commit(), discard returnCodes.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 5ed24f77a6..32fa19eb2f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -97,6 +97,8 @@ public interface QueryServices extends SQLCloseable {
public static final String SCAN_CACHE_SIZE_ATTRIB =
"hbase.client.scanner.caching";
public static final String MAX_MUTATION_SIZE_ATTRIB =
"phoenix.mutate.maxSize";
public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB =
"phoenix.mutate.maxSizeBytes";
+ public static final String PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB =
+ "phoenix.mutate.preserveOnLimitExceeded";
public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE =
"hbase.client.keyvalue.maxsize";
public static final String MUTATE_BATCH_SIZE_ATTRIB =
"phoenix.mutate.batchSize";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c4c4410856..ac0b0190f2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -169,6 +169,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100
Mb
+ public static final boolean DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED =
false;
public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = 10485760; //
10 Mb
public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows
may be updated
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
new file mode 100644
index 0000000000..d6d750d774
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+/**
+ * Exception thrown when a mutation operation would exceed the configured
mutation buffer size
+ * limit. Unlike MaxMutationSizeBytesExceededException, existing buffered
mutations are preserved
+ * and can be committed before retrying.
+ */
+public class MutationLimitReachedException extends SQLException {
+ private static final long serialVersionUID = 1L;
+ private static final SQLExceptionCode CODE =
SQLExceptionCode.MUTATION_LIMIT_REACHED;
+
+ public MutationLimitReachedException() {
+ super(new SQLExceptionInfo.Builder(CODE).build().toString(),
CODE.getSQLState(),
+ CODE.getErrorCode());
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index 4f4bbffa5e..a2070712e0 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -173,21 +173,6 @@ public class PhoenixKeyValueUtil {
return size;
}
- /**
- * Estimates the storage size of a row
- * @param tableMutationMap map from table to row to RowMutationState
- * @return estimated row size
- */
- public static long
- getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState>
tableMutationMap) {
- long size = 0;
- // iterate over table
- for (Entry<TableRef, MultiRowMutationState> tableEntry :
tableMutationMap.entrySet()) {
- size += calculateMultiRowMutationSize(tableEntry.getValue());
- }
- return size;
- }
-
public static long getEstimatedRowMutationSizeWithBatch(
Map<TableRef, List<MultiRowMutationState>> tableMutationMap) {
long size = 0;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index c654601a4a..0b470acd14 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -45,8 +45,11 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.MutationLimitBatchException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.MaxMutationSizeExceededException;
+import org.apache.phoenix.schema.MutationLimitReachedException;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.Repeat;
@@ -81,6 +84,31 @@ public class MutationStateIT extends ParallelStatsDisabledIT
{
}
}
+ /**
+ * Helper to create Properties for preserve mode tests.
+ * @param maxRows max mutation row count limit
+ * @param maxBytes max mutation byte size limit (0 to skip setting)
+ */
+ private Properties createPreserveModeProps(int maxRows, long maxBytes) {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
String.valueOf(maxRows));
+ if (maxBytes > 0) {
+ props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
String.valueOf(maxBytes));
+ }
+
props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB,
"true");
+ return props;
+ }
+
+ /**
+ * Helper to verify the row count in a table.
+ */
+ private void verifyRowCount(Connection conn, String tableName, int expected)
throws SQLException {
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*)
FROM " + tableName)) {
+ assertTrue("Should have results", rs.next());
+ assertEquals("Row count mismatch", expected, rs.getInt(1));
+ }
+ }
+
public static String randString(int length) {
return new BigInteger(164, RAND).toString().substring(0, length);
}
@@ -313,6 +341,475 @@ public class MutationStateIT extends
ParallelStatsDisabledIT {
}
}
+ /**
+ * Tests that when preserveOnLimitExceeded=true, executeUpdate() throws
+ * MutationLimitReachedException without clearing buffered mutations,
allowing the client to
+ * commit and retry.
+ */
+ @Test
+ public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws
Exception {
+ String fullTableName = generateUniqueName();
+ int maxRows = 5;
+
+ // Create table
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(maxRows, 0))) {
+ conn.setAutoCommit(false);
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?,?,?)");
+
+ int totalRows = 12;
+ int processed = 0;
+ int commitCount = 0;
+
+ while (processed < totalRows) {
+ try {
+ for (int i = processed; i < totalRows; i++) {
+ stmt.setString(1, "ORG" + String.format("%011d", i));
+ stmt.setString(2, "ENT" + String.format("%011d", i));
+ stmt.setInt(3, i);
+ stmt.executeUpdate();
+ processed = i + 1;
+ }
+ conn.commit();
+ commitCount++;
+ break; // All done
+ } catch (MutationLimitReachedException e) {
+ // Verify the exception is the expected type
+ assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(),
e.getErrorCode());
+
+ // Verify mutations were preserved - MutationState should have rows
+ MutationState state = conn.getMutationState();
+ assertTrue("Mutations should be preserved", state.getNumRows() > 0);
+
+ // Commit what we have so far
+ conn.commit();
+ commitCount++;
+ // Loop continues from 'processed' index
+ }
+ }
+
+ // Should have required multiple commits due to limit
+ assertTrue("Should have committed multiple times", commitCount == 3);
+ verifyRowCount(conn, fullTableName, totalRows);
+ }
+ }
+
+ /**
+ * Tests that when preserveOnLimitExceeded=true, executeBatch() throws
MutationLimitBatchException
+ * with proper checkpoint info, allowing recovery.
+ */
+ @Test
+ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception {
+ String fullTableName = generateUniqueName();
+ int maxRows = 5;
+
+ // Create table
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(maxRows, 0))) {
+ conn.setAutoCommit(false);
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?,?,?)");
+
+ int totalRows = 15;
+ int commitCount = 0;
+
+ // Add all rows to batch
+ for (int i = 0; i < totalRows; i++) {
+ stmt.setString(1, "ORG" + String.format("%011d", i));
+ stmt.setString(2, "ENT" + String.format("%011d", i));
+ stmt.setInt(3, i);
+ stmt.addBatch();
+ }
+
+ // Execute batch - should hit limit and throw MutationLimitBatchException
+ while (true) {
+ try {
+ stmt.executeBatch();
+ conn.commit();
+ commitCount++;
+ break; // All done
+ } catch (MutationLimitBatchException e) {
+ // Verify we got processedCount
+ assertTrue("ProcessedCount should be > 0", e.getProcessedCount() >
0);
+
+ // Commit what was successfully buffered
+ conn.commit();
+ commitCount++;
+ // executeBatch() trims the batch, so just retry
+ }
+ }
+
+ // Should have required multiple commits
+ assertTrue("Should have committed multiple times", commitCount == 3);
+ verifyRowCount(conn, fullTableName, totalRows);
+ }
+ }
+
+ /**
+ * Tests executeBatch() with autoCommit=true and
preserveOnLimitExceeded=true. In this case,
+ * executeBatch() should auto-commit on limit and trim batch.
+ */
+ @Test
+ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws
Exception {
+ String fullTableName = generateUniqueName();
+ int maxRows = 5;
+
+ // Create table
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(maxRows, 0))) {
+ conn.setAutoCommit(true); // autoCommit is true
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?,?,?)");
+
+ int totalRows = 15;
+ int exceptionCount = 0;
+
+ // Add all rows to batch
+ for (int i = 0; i < totalRows; i++) {
+ stmt.setString(1, "ORG" + String.format("%011d", i));
+ stmt.setString(2, "ENT" + String.format("%011d", i));
+ stmt.setInt(3, i);
+ stmt.addBatch();
+ }
+
+ // Execute batch - with autoCommit=true, it should auto-commit and trim
+ while (true) {
+ try {
+ stmt.executeBatch();
+ break; // All done
+ } catch (MutationLimitBatchException e) {
+ exceptionCount++;
+ // With autoCommit=true, mutations were already committed
+ // Batch is trimmed, just retry
+ }
+ }
+
+ // Should have hit limit at least once
+ assertTrue("Should have hit limit at least once", exceptionCount > 0);
+ verifyRowCount(conn, fullTableName, totalRows);
+ }
+ }
+
+ /**
+ * Tests that when preserveOnLimitExceeded=false (default), the old behavior
is maintained -
+ * mutations are cleared on limit exceeded.
+ */
+ @Test
+ public void testExecuteUpdateDefaultBehaviorClearsMutations() throws
Exception {
+ String fullTableName = generateUniqueName();
+ int maxRows = 5;
+
+ // Create table
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ // Test with preserveOnLimitExceeded=false (default)
+ Properties props = new Properties();
+ props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
String.valueOf(maxRows));
+ // Not setting PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, should default
to false
+
+ try (
+ PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?,?,?)");
+
+ try {
+ for (int i = 0; i < 20; i++) {
+ stmt.setString(1, "ORG" + String.format("%011d", i));
+ stmt.setString(2, "ENT" + String.format("%011d", i));
+ stmt.setInt(3, i);
+ stmt.executeUpdate();
+ }
+ fail("Should have thrown MaxMutationSizeExceededException");
+ } catch (SQLException e) {
+ // Should be the old exception type, not MutationLimitReachedException
+
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(),
e.getErrorCode());
+
+ // Verify mutations were cleared (old behavior)
+ MutationState state = conn.getMutationState();
+ assertEquals("Mutations should have been cleared", 0,
state.getNumRows());
+ }
+ verifyRowCount(conn, fullTableName, 0);
+ }
+ }
+
+ /**
+ * Tests byte size limit with preserveOnLimitExceeded=true.
+ */
+ @Test
+ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception {
+ String fullTableName = generateUniqueName();
+
+ // Create table with a VARCHAR column for large values
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ // High row limit, low byte limit (~3-4 rows at ~900+ bytes each)
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(1000000, 3000))) {
+ conn.setAutoCommit(false);
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
fullTableName
+ + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+ int totalRows = 10;
+ int processed = 0;
+ int commitCount = 0;
+ String largeValue = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; // ~36 bytes
per row in tags
+
+ while (processed < totalRows) {
+ try {
+ for (int i = processed; i < totalRows; i++) {
+ stmt.setString(1, "ORG" + String.format("%011d", i));
+ stmt.setString(2, "ENT" + String.format("%011d", i));
+ stmt.setInt(3, i);
+ stmt.setString(4, largeValue + i);
+ stmt.executeUpdate();
+ processed = i + 1;
+ }
+ conn.commit();
+ commitCount++;
+ break;
+ } catch (MutationLimitReachedException e) {
+ assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(),
e.getErrorCode());
+ conn.commit();
+ commitCount++;
+ }
+ }
+
+ // Should have required multiple commits due to byte limit
+ assertTrue("Should have committed multiple times due to byte limit",
commitCount > 1);
+ verifyRowCount(conn, fullTableName, totalRows);
+ }
+ }
+
+ /**
+ * Tests that when preserveOnLimitExceeded is true but a single mutation
exceeds the limit
+ * (constructor case), we still get the old exception type since there's no
existing state to
+ * preserve.
+ */
+ @Test
+ public void testSingleMutationExceedsLimitWithPreserveOption() throws
Exception {
+ String fullTableName = generateUniqueName();
+ String sourceTable = generateUniqueName();
+
+ // Use a connection without limits to create tables and set up test data
+ try (Connection setupConn = DriverManager.getConnection(getUrl())) {
+ setupConn.setAutoCommit(true);
+ try (Statement stmt = setupConn.createStatement()) {
+ stmt.execute("CREATE TABLE " + fullTableName + DDL);
+ stmt.execute("CREATE TABLE " + sourceTable + " (id VARCHAR PRIMARY
KEY, val INTEGER)");
+ // Insert multiple rows into source table
+ stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('a', 1)");
+ stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('b', 2)");
+ }
+ }
+
+ // Now use a connection with the limit set
+ try (PhoenixConnection conn =
+ (PhoenixConnection) DriverManager.getConnection(getUrl(),
createPreserveModeProps(1, 0))) {
+ conn.setAutoCommit(false);
+
+ // Insert a row to have some existing state in the connection's
MutationState
+ try (PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?, ?, ?)")) {
+ stmt.setString(1, "org1");
+ stmt.setString(2, "entity1");
+ stmt.setInt(3, 1);
+ stmt.executeUpdate();
+ }
+
+ // Now do a SELECT-based UPSERT that produces multiple rows in a single
mutation.
+ // This creates a new MutationState in the constructor that already
exceeds the limit.
+ // Since there's no state to preserve in that NEW MutationState (only 1
row was inserted
+ // into the connection's existing state), we should get the old
exception type
+ // (MaxMutationSizeExceededException), not MutationLimitReachedException.
+ try (Statement stmt = conn.createStatement()) {
+ try {
+ stmt.executeUpdate("UPSERT INTO " + fullTableName
+ + " (organization_id, entity_id, score) " + "SELECT id, 'e', val
FROM " + sourceTable);
+ fail("Expected MaxMutationSizeExceededException");
+ } catch (MaxMutationSizeExceededException e) {
+ // Expected - legacy exception even though preserveOnLimitExceeded
is true,
+ // because the single mutation (from constructor) has no prior state
to preserve
+ } catch (MutationLimitReachedException e) {
+ fail(
+ "Should have thrown MaxMutationSizeExceededException, not
MutationLimitReachedException. "
+ + "When a single mutation exceeds the limit, there's no state to
preserve.");
+ }
+ }
+ MutationState state = conn.getMutationState();
+ assertEquals("Mutation should not have been cleared", 1,
state.getNumRows());
+ }
+ }
+
+ /**
+ * Tests that when preserveOnLimitExceeded=true and the limit is reached
during row merge (same
+ * row key updated multiple times), the mutations are preserved. This
specifically tests the 3rd
+ * case in joinMutationState where rows are merged.
+ */
+ @Test
+ public void testRowMergePreserveOnLimitExceeded() throws Exception {
+ String fullTableName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ // Low byte limit so merging rows with increasing sizes triggers limit
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(1000, 1500))) {
+ conn.setAutoCommit(false);
+
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
fullTableName
+ + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+ int commitCount = 0;
+ int totalUpdates = 0;
+ String baseKey = "ORG000000000001";
+ String entityKey = "ENT000000000001";
+ int maxScore = 50;
+
+ // Keep updating the SAME row with larger values until limit reached
+ // Each update merges with the existing row, increasing the size
+ while (totalUpdates <= maxScore) {
+ try {
+ for (int i = totalUpdates; i <= maxScore; i++) {
+ stmt.setString(1, baseKey);
+ stmt.setString(2, entityKey);
+ stmt.setInt(3, i);
+ // Increasing size with each update - this will eventually exceed
byte limit
+ StringBuilder sb = new StringBuilder("update" + i + "_");
+ for (int j = 0; j < i * 10; j++) {
+ sb.append("X");
+ }
+ stmt.setString(4, sb.toString());
+ stmt.executeUpdate();
+ totalUpdates = i + 1;
+ }
+ conn.commit();
+ commitCount++;
+ break;
+ } catch (MutationLimitReachedException e) {
+ // Mutations should be preserved - verify we have state
+ MutationState state = conn.getMutationState();
+ assertTrue("Mutations should be preserved on limit exceeded",
+ state.getNumRows() > 0 || state.getEstimatedSize() > 0);
+ conn.commit();
+ commitCount++;
+ }
+ }
+
+ // Should have hit limit and committed multiple times due to row merge
size growth
+ assertTrue("Should have committed multiple times", commitCount > 1);
+
+ // Verify the row exists with the latest value
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT score
FROM " + fullTableName
+ + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" +
entityKey + "'")) {
+ assertTrue("Row should exist", rs.next());
+ // Score should be the last successfully committed value
+ assertEquals("Score should be set to the max value", maxScore,
rs.getInt(1));
+ }
+ }
+ }
+
+ /**
+ * Tests that when preserveOnLimitExceeded=true and the limit is reached
during conflicting row
+ * addition (regular upsert vs ON DUPLICATE KEY upsert on same key), the
mutations are preserved.
+ * This tests the conflict case in joinMutationState.
+ */
+ @Test
+ public void testConflictingRowsPreserveOnLimitExceeded() throws Exception {
+ String fullTableName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL);
+ }
+
+ // Low byte limit so adding conflicting rows triggers limit
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl(),
+ createPreserveModeProps(1000, 2000))) {
+ conn.setAutoCommit(false);
+
+ // Regular UPSERT statement
+ PreparedStatement regularStmt = conn.prepareStatement("UPSERT INTO " +
fullTableName
+ + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)");
+
+ // ON DUPLICATE KEY UPSERT statement - conflicts with regular upsert
+ PreparedStatement onDupKeyStmt = conn.prepareStatement(
+ "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score)
VALUES (?,?,?)"
+ + " ON DUPLICATE KEY UPDATE score = score + 1");
+
+ int commitCount = 0;
+ int totalRows = 0;
+ String orgKey = "ORG000000000001";
+
+ // Alternate between regular upserts and ON DUPLICATE KEY upserts on
same keys
+ // This creates conflicting rows that can't be merged
+ while (totalRows < 30) {
+ try {
+ for (int i = totalRows; i < 30; i++) {
+ String entityKey = String.format("ENT%012d", i);
+
+ // First do a regular upsert
+ regularStmt.setString(1, orgKey);
+ regularStmt.setString(2, entityKey);
+ regularStmt.setInt(3, i);
+ StringBuilder sb = new StringBuilder("data" + i + "_");
+ for (int j = 0; j < 20; j++) {
+ sb.append("X");
+ }
+ regularStmt.setString(4, sb.toString());
+ regularStmt.executeUpdate();
+
+ // Then do an ON DUPLICATE KEY upsert on the same key - this
conflicts
+ onDupKeyStmt.setString(1, orgKey);
+ onDupKeyStmt.setString(2, entityKey);
+ onDupKeyStmt.setInt(3, i * 10);
+ onDupKeyStmt.executeUpdate();
+
+ totalRows = i + 1;
+ }
+ conn.commit();
+ commitCount++;
+ break;
+ } catch (MutationLimitReachedException e) {
+ // Mutations should be preserved - verify we have state
+ MutationState state = conn.getMutationState();
+ assertTrue("Mutations should be preserved on limit exceeded",
+ state.getNumRows() > 0 || state.getEstimatedSize() > 0);
+ conn.commit();
+ commitCount++;
+ }
+ }
+
+ // Should have hit limit and committed multiple times due to conflicting
row size
+ assertTrue("Should have committed multiple times", commitCount > 1);
+ // Verify rows exist
+ verifyRowCount(conn, fullTableName, totalRows);
+ }
+ }
+
@Test
public void testMutationEstimatedSize() throws Exception {
PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl());
@@ -380,6 +877,35 @@ public class MutationStateIT extends
ParallelStatsDisabledIT {
stmt.execute();
assertTrue("Mutation state size should decrease",
prevEstimatedSize + 4 > state.getEstimatedSize());
+
+ // Test that estimatedSize increases when adding conflicting rows
+ // (regular upsert + ON DUPLICATE KEY upsert on same key cannot be merged)
+ conn.commit(); // clear state
+ assertEquals("Mutation state size should be zero after commit", 0,
state.getEstimatedSize());
+
+ // Regular upsert
+ stmt = conn.prepareStatement(
+ "upsert into " + fullTableName + " (organization_id, entity_id, score)
values (?,?,?)");
+ stmt.setString(1, "AAAA");
+ stmt.setString(2, "BBBB");
+ stmt.setInt(3, 1);
+ stmt.execute();
+ long sizeAfterRegularUpsert = state.getEstimatedSize();
+ assertTrue("Mutation state size should be > 0 after regular upsert",
+ sizeAfterRegularUpsert > 0);
+
+ // ON DUPLICATE KEY upsert on same key - creates a conflicting row that
can't be merged
+ PreparedStatement onDupStmt = conn.prepareStatement(
+ "upsert into " + fullTableName + " (organization_id, entity_id, score)
values (?,?,?)"
+ + " ON DUPLICATE KEY UPDATE score = score + 1");
+ onDupStmt.setString(1, "AAAA");
+ onDupStmt.setString(2, "BBBB");
+ onDupStmt.setInt(3, 2);
+ onDupStmt.execute();
+
+ // Size should increase because conflicting row goes into a separate batch
+ assertTrue("Estimated size should increase for conflicting row",
+ state.getEstimatedSize() > sizeAfterRegularUpsert);
}
@Test