This is an automated email from the ASF dual-hosted git repository.
richardantal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 9006fafce5 PHOENIX-7198 support for multi row constructors in single
upsert query (#2222)
9006fafce5 is described below
commit 9006fafce5e21861aea70fceeb1e66fba631e3d5
Author: richardantal <[email protected]>
AuthorDate: Tue Feb 10 10:26:46 2026 +0100
PHOENIX-7198 support for multi row constructors in single upsert query
(#2222)
---
phoenix-core-client/src/main/antlr3/PhoenixSQL.g | 3 +-
.../org/apache/phoenix/compile/UpsertCompiler.java | 334 +++++++++++----------
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 4 +-
.../org/apache/phoenix/parse/ParseNodeFactory.java | 2 +-
.../org/apache/phoenix/parse/UpsertStatement.java | 6 +-
.../apache/phoenix/end2end/MultipleUpsertIT.java | 123 ++++++++
.../org/apache/phoenix/parse/QueryParserTest.java | 36 +++
7 files changed, 347 insertions(+), 161 deletions(-)
diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
index b905c43f3e..945d981f26 100644
--- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
@@ -883,9 +883,10 @@ finally{ contextStack.pop(); }
// Parse a full upsert expression structure.
upsert_node returns [UpsertStatement ret]
+@init{List<List<ParseNode>> v = new ArrayList<List<ParseNode>>(); }
: UPSERT (hint=hintClause)? INTO t=from_table_name
(LPAREN p=upsert_column_refs RPAREN)?
- ((VALUES LPAREN v=one_or_more_expressions RPAREN (
+ ((VALUES LPAREN e = one_or_more_expressions {v.add(e);} RPAREN (COMMA
LPAREN e = one_or_more_expressions {v.add(e);} RPAREN )* (
ON DUPLICATE KEY (
ig=IGNORE
| ( upd=UPDATE pairs=update_column_pairs )
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7d42690420..b9dfbc33e5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -26,6 +26,7 @@ import java.sql.ParameterMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
@@ -383,7 +384,7 @@ public class UpsertCompiler {
int[] columnIndexesToBe;
int nColumnsToSet = 0;
int[] pkSlotIndexesToBe;
- List<ParseNode> valueNodes = upsert.getValues();
+ List<List<ParseNode>> valueNodesList = upsert.getValues();
List<PColumn> targetColumns;
NamedTableNode tableNode = upsert.getTable();
String tableName = tableNode.getName().getTableName();
@@ -557,8 +558,12 @@ public class UpsertCompiler {
columnsBeingSet.set(columnIndexesToBe[i] =
rowTimestampCol.getPosition());
pkSlotIndexesToBe[i] = table.getRowTimestampColPos();
targetColumns.add(rowTimestampCol);
- if (valueNodes != null && !valueNodes.isEmpty()) {
- valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol));
+ if (valueNodesList != null) {
+ for (List<ParseNode> parseNode : valueNodesList) {
+ if (!parseNode.isEmpty()) {
+ parseNode.add(getNodeForRowTimestampColumn(rowTimestampCol));
+ }
+ }
}
nColumnsToSet++;
}
@@ -571,7 +576,7 @@ public class UpsertCompiler {
}
}
boolean isAutoCommit = connection.getAutoCommit();
- if (valueNodes == null) {
+ if (valueNodesList.isEmpty()) {
SelectStatement select = upsert.getSelect();
assert (select != null);
select = SubselectRewriter.flatten(select, connection);
@@ -675,8 +680,8 @@ public class UpsertCompiler {
// Cannot auto commit if doing aggregation or topN or salted
// Salted causes problems because the row may end up living on a
different region
} else {
- nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() +
(isTenantSpecific ? 1 : 0)
- + (isSharedViewIndex ? 1 : 0);
+ nValuesToSet = valueNodesList.get(0).size() + addViewColumnsToBe.size()
+ + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
}
// Resize down to allow a subset of columns to be specifiable
if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
@@ -703,7 +708,7 @@ public class UpsertCompiler {
final QueryPlan originalQueryPlan = queryPlanToBe;
RowProjector projectorToBe = null;
// Optimize only after all checks have been performed
- if (valueNodes == null) {
+ if (valueNodesList.isEmpty()) {
queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe,
statement, targetColumns,
parallelIteratorFactoryToBe);
projectorToBe = queryPlanToBe.getProjector();
@@ -726,7 +731,7 @@ public class UpsertCompiler {
////////////////////////////////////////////////////////////////////
// UPSERT SELECT
/////////////////////////////////////////////////////////////////////
- if (valueNodes == null) {
+ if (valueNodesList.isEmpty()) {
// Before we re-order, check that for updatable view columns
// the projected expression either matches the column name or
// is a constant with the same required value.
@@ -851,101 +856,113 @@ public class UpsertCompiler {
////////////////////////////////////////////////////////////////////
// UPSERT VALUES
/////////////////////////////////////////////////////////////////////
- final byte[][] values = new byte[nValuesToSet][];
- int nodeIndex = 0;
- if (isSharedViewIndex) {
- values[nodeIndex++] =
table.getviewIndexIdType().toBytes(table.getViewIndexId());
- }
- if (isTenantSpecific) {
- PName tenantId = connection.getTenantId();
- values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(),
- table.getBucketNum() != null, tenantId, isSharedViewIndex);
- }
-
- final int nodeIndexOffset = nodeIndex;
- // Allocate array based on size of all columns in table,
- // since some values may not be set (if they're nullable).
- final StatementContext context =
+ StatementContext context =
new StatementContext(statement, resolver, new Scan(), new
SequenceManager(statement));
- UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context);
- final List<Expression> constantExpressions =
- Lists.newArrayListWithExpectedSize(valueNodes.size());
- // First build all the expressions, as with sequences we want to collect
them all first
- // and initialize them in one batch
- List<Pair<ColumnName, ParseNode>> jsonExpressions = Lists.newArrayList();
- List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
- for (ParseNode valueNode : valueNodes) {
- if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build()
- .buildException();
- }
- PColumn column = allColumns.get(columnIndexes[nodeIndex]);
- expressionBuilder.setColumn(column);
- Expression expression = valueNode.accept(expressionBuilder);
- if (
- expression.getDataType() != null
- && !expression.getDataType().isCastableTo(column.getDataType())
- ) {
- throw TypeMismatchException.newException(expression.getDataType(),
column.getDataType(),
- "expression: " + expression.toString() + " in column " + column);
+ List<byte[][]> valuesList = new ArrayList<>();
+ int nodeIndexOffset = 0;
+ List<List<Expression>> constantExpressionsList = new ArrayList<>();
+
+ byte[] onDupKeyBytes = null;
+ OnDuplicateKeyType onDupKeyType = null;
+
+ for (List<ParseNode> valueNodesItem : valueNodesList) {
+ byte[][] values = new byte[nValuesToSet][];
+ int nodeIndex = 0;
+ if (isSharedViewIndex) {
+ values[nodeIndex++] =
table.getviewIndexIdType().toBytes(table.getViewIndexId());
}
- if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) {
- nonPKColumns
- .add(new
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
- column.getName().getString()), valueNode));
- } else if (valueNode.hasJsonExpression()) {
- jsonExpressions
- .add(new
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
- column.getName().getString()), valueNode));
+ if (isTenantSpecific) {
+ PName tenantId = connection.getTenantId();
+ values[nodeIndex++] =
ScanUtil.getTenantIdBytes(table.getRowKeySchema(),
+ table.getBucketNum() != null, tenantId, isSharedViewIndex);
}
- constantExpressions.add(expression);
- nodeIndex++;
- }
- if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
- jsonExpressions.addAll(nonPKColumns);
- nonPKColumns.clear();
- }
- byte[] onDupKeyBytesToBe = null;
- List<Pair<ColumnName, ParseNode>> onDupKeyPairs =
upsert.getOnDupKeyPairs();
- OnDuplicateKeyType onDupKeyType = upsert.getOnDupKeyType();
- if (onDupKeyPairs != null) {
- if (table.isImmutableRows()) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
- .setSchemaName(table.getSchemaName().getString())
-
.setTableName(table.getTableName().getString()).build().buildException();
- }
- if (table.isTransactional()) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
- .setSchemaName(table.getSchemaName().getString())
-
.setTableName(table.getTableName().getString()).build().buildException();
+ nodeIndexOffset = nodeIndex;
+ // Allocate array based on size of all columns in table,
+ // since some values may not be set (if they're nullable).
+ UpsertValuesCompiler expressionBuilder = new
UpsertValuesCompiler(context);
+ List<Expression> constantExpressions =
+ Lists.newArrayListWithExpectedSize(valueNodesItem.size());
+ // First build all the expressions, as with sequences we want to collect
them all first
+ // and initialize them in one batch
+ List<Pair<ColumnName, ParseNode>> jsonExpressions = Lists.newArrayList();
+ List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
+ for (ParseNode valueNode : valueNodesItem) {
+ if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build()
+ .buildException();
+ }
+ PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+ expressionBuilder.setColumn(column);
+ Expression expression = valueNode.accept(expressionBuilder);
+ if (
+ expression.getDataType() != null
+ && !expression.getDataType().isCastableTo(column.getDataType())
+ ) {
+ throw TypeMismatchException.newException(expression.getDataType(),
column.getDataType(),
+ "expression: " + expression.toString() + " in column " + column);
+ }
+ if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) {
+ nonPKColumns
+ .add(new
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+ column.getName().getString()), valueNode));
+ } else if (valueNode.hasJsonExpression()) {
+ jsonExpressions
+ .add(new
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+ column.getName().getString()), valueNode));
+ }
+ constantExpressions.add(expression);
+ nodeIndex++;
}
- if (connection.getSCN() != null) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
- .setSchemaName(table.getSchemaName().getString())
-
.setTableName(table.getTableName().getString()).build().buildException();
+ if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
+ jsonExpressions.addAll(nonPKColumns);
+ nonPKColumns.clear();
}
-
- switch (onDupKeyType) {
- case IGNORE: {
- onDupKeyBytesToBe =
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
- break;
+ byte[] onDupKeyBytesToBe = null;
+ List<Pair<ColumnName, ParseNode>> onDupKeyPairs =
upsert.getOnDupKeyPairs();
+ onDupKeyType = upsert.getOnDupKeyType();
+
+ if (onDupKeyPairs != null) {
+ if (table.isImmutableRows()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
+ .setSchemaName(table.getSchemaName().getString())
+
.setTableName(table.getTableName().getString()).build().buildException();
}
- case UPDATE:
- case UPDATE_ONLY: {
- onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
onDupKeyPairs, resolver);
- break;
+ if (table.isTransactional()) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
+ .setSchemaName(table.getSchemaName().getString())
+
.setTableName(table.getTableName().getString()).build().buildException();
}
- default:
- break;
+ if (connection.getSCN() != null) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
+ .setSchemaName(table.getSchemaName().getString())
+
.setTableName(table.getTableName().getString()).build().buildException();
+ }
+
+ switch (onDupKeyType) {
+ case IGNORE: {
+ onDupKeyBytesToBe =
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
+ break;
+ }
+ case UPDATE:
+ case UPDATE_ONLY: {
+ onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
onDupKeyPairs, resolver);
+ break;
+ }
+ default:
+ break;
+ }
+ } else if (!jsonExpressions.isEmpty()) {
+ onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
jsonExpressions, resolver);
}
- } else if (!jsonExpressions.isEmpty()) {
- onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context,
jsonExpressions, resolver);
+ onDupKeyBytes = onDupKeyBytesToBe;
+ valuesList.add(values);
+ constantExpressionsList.add(constantExpressions);
}
- final byte[] onDupKeyBytes = onDupKeyBytesToBe;
- return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset,
constantExpressions,
- allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
connection,
+ return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset,
constantExpressionsList,
+ allColumns, columnIndexes, overlapViewColumns, valuesList,
addViewColumns, connection,
pkSlotIndexes, useServerTimestamp, onDupKeyBytes, onDupKeyType, maxSize,
maxSizeBytes);
}
@@ -1252,11 +1269,11 @@ public class UpsertCompiler {
private final StatementContext context;
private final TableRef tableRef;
private final int nodeIndexOffset;
- private final List<Expression> constantExpressions;
+ private final List<List<Expression>> constantExpressionsList;
private final List<PColumn> allColumns;
private final int[] columnIndexes;
private final Set<PColumn> overlapViewColumns;
- private final byte[][] values;
+ private final List<byte[][]> valuesList;
private final Set<PColumn> addViewColumns;
private final PhoenixConnection connection;
private final int[] pkSlotIndexes;
@@ -1267,19 +1284,19 @@ public class UpsertCompiler {
private final long maxSizeBytes;
public UpsertValuesMutationPlan(StatementContext context, TableRef
tableRef,
- int nodeIndexOffset, List<Expression> constantExpressions, List<PColumn>
allColumns,
- int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
+ int nodeIndexOffset, List<List<Expression>> constantExpressionsList,
List<PColumn> allColumns,
+ int[] columnIndexes, Set<PColumn> overlapViewColumns, List<byte[][]>
valuesList,
Set<PColumn> addViewColumns, PhoenixConnection connection, int[]
pkSlotIndexes,
boolean useServerTimestamp, byte[] onDupKeyBytes, OnDuplicateKeyType
onDupKeyType,
int maxSize, long maxSizeBytes) {
this.context = context;
this.tableRef = tableRef;
this.nodeIndexOffset = nodeIndexOffset;
- this.constantExpressions = constantExpressions;
+ this.constantExpressionsList = constantExpressionsList;
this.allColumns = allColumns;
this.columnIndexes = columnIndexes;
this.overlapViewColumns = overlapViewColumns;
- this.values = values;
+ this.valuesList = valuesList;
this.addViewColumns = addViewColumns;
this.connection = connection;
this.pkSlotIndexes = pkSlotIndexes;
@@ -1325,74 +1342,83 @@ public class UpsertCompiler {
ImmutableBytesWritable ptr = context.getTempPtr();
final SequenceManager sequenceManager = context.getSequenceManager();
// Next evaluate all the expressions
- int nodeIndex = nodeIndexOffset;
PTable table = tableRef.getTable();
Tuple tuple =
sequenceManager.getSequenceCount() == 0 ? null :
sequenceManager.newSequenceTuple(null);
- for (Expression constantExpression : constantExpressions) {
- if (!constantExpression.isStateless()) {
- nodeIndex++;
- continue;
- }
- PColumn column = allColumns.get(columnIndexes[nodeIndex]);
- constantExpression.evaluate(tuple, ptr);
- Object value = null;
- if (constantExpression.getDataType() != null) {
- value = constantExpression.getDataType().toObject(ptr,
constantExpression.getSortOrder(),
- constantExpression.getMaxLength(), constantExpression.getScale());
- if
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
- throw
TypeMismatchException.newException(constantExpression.getDataType(),
- column.getDataType(),
- "expression: " + constantExpression.toString() + " in column " +
column);
+ for (int index = 0; index < valuesList.size(); index++) {
+ byte[][] valuesListItem = valuesList.get(index);
+ int nodeIndex = nodeIndexOffset;
+ for (Expression constantExpression :
constantExpressionsList.get(index)) {
+ if (!constantExpression.isStateless()) {
+ nodeIndex++;
+ continue;
+ }
+ PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+ constantExpression.evaluate(tuple, ptr);
+ Object value = null;
+ if (constantExpression.getDataType() != null) {
+ value =
+ constantExpression.getDataType().toObject(ptr,
constantExpression.getSortOrder(),
+ constantExpression.getMaxLength(),
constantExpression.getScale());
+ if
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+ throw
TypeMismatchException.newException(constantExpression.getDataType(),
+ column.getDataType(),
+ "expression: " + constantExpression.toString() + " in column "
+ column);
+ }
+ if (
+ !column.getDataType().isSizeCompatible(ptr, value,
constantExpression.getDataType(),
+ constantExpression.getSortOrder(),
constantExpression.getMaxLength(),
+ constantExpression.getScale(), column.getMaxLength(),
column.getScale())
+ ) {
+ throw new DataExceedsCapacityException(column.getDataType(),
column.getMaxLength(),
+ column.getScale(), column.getName().getString());
+ }
}
+ column.getDataType().coerceBytes(ptr, value,
constantExpression.getDataType(),
+ constantExpression.getMaxLength(), constantExpression.getScale(),
+ constantExpression.getSortOrder(), column.getMaxLength(),
column.getScale(),
+ column.getSortOrder(), table.rowKeyOrderOptimizable());
if (
- !column.getDataType().isSizeCompatible(ptr, value,
constantExpression.getDataType(),
- constantExpression.getSortOrder(),
constantExpression.getMaxLength(),
- constantExpression.getScale(), column.getMaxLength(),
column.getScale())
+ overlapViewColumns.contains(column)
+ && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(),
+ column.getViewConstant(), 0, column.getViewConstant().length -
1) != 0
) {
- throw new DataExceedsCapacityException(column.getDataType(),
column.getMaxLength(),
- column.getScale(), column.getName().getString());
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+ .setColumnName(column.getName().getString())
+ .setMessage("value=" +
constantExpression.toString()).build().buildException();
}
+ valuesListItem[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ nodeIndex++;
}
- column.getDataType().coerceBytes(ptr, value,
constantExpression.getDataType(),
- constantExpression.getMaxLength(), constantExpression.getScale(),
- constantExpression.getSortOrder(), column.getMaxLength(),
column.getScale(),
- column.getSortOrder(), table.rowKeyOrderOptimizable());
- if (
- overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(),
ptr.getOffset(),
- ptr.getLength(), column.getViewConstant(), 0,
column.getViewConstant().length - 1) != 0
- ) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
- .setColumnName(column.getName().getString())
- .setMessage("value=" +
constantExpression.toString()).build().buildException();
+ // Add columns based on view
+ for (PColumn column : addViewColumns) {
+ if (IndexUtil.getViewConstantValue(column, ptr)) {
+ valuesListItem[nodeIndex++] =
ByteUtil.copyKeyBytesIfNecessary(ptr);
+ } else {
+ throw new IllegalStateException();
+ }
}
- values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
- nodeIndex++;
}
- // Add columns based on view
- for (PColumn column : addViewColumns) {
- if (IndexUtil.getViewConstantValue(column, ptr)) {
- values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
- } else {
- throw new IllegalStateException();
+
+ MultiRowMutationState mutation = new
MultiRowMutationState(valuesList.size());
+ for (byte[][] valuesListItems : valuesList) {
+ IndexMaintainer indexMaintainer = null;
+ byte[][] viewConstants = null;
+ if (table.getIndexType() == IndexType.LOCAL) {
+ PTable parentTable = statement.getConnection().getMetaDataCache()
+ .getTableRef(new PTableKey(statement.getConnection().getTenantId(),
+ table.getParentName().getString()))
+ .getTable();
+ indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+ viewConstants = IndexUtil.getViewConstants(parentTable);
}
+ int maxHBaseClientKeyValueSize =
statement.getConnection().getQueryServices().getProps()
+ .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
+ QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
+ setValues(valuesListItems, pkSlotIndexes, columnIndexes, table,
mutation, statement,
+ useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes,
onDupKeyType, 0,
+ maxHBaseClientKeyValueSize);
}
- MultiRowMutationState mutation = new MultiRowMutationState(1);
- IndexMaintainer indexMaintainer = null;
- byte[][] viewConstants = null;
- if (table.getIndexType() == IndexType.LOCAL) {
- PTable parentTable =
statement.getConnection().getMetaDataCache().getTableRef(
- new PTableKey(statement.getConnection().getTenantId(),
table.getParentName().getString()))
- .getTable();
- indexMaintainer = table.getIndexMaintainer(parentTable, connection);
- viewConstants = IndexUtil.getViewConstants(parentTable);
- }
- int maxHBaseClientKeyValueSize =
statement.getConnection().getQueryServices().getProps()
- .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
- QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation,
statement,
- useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes,
onDupKeyType, 0,
- maxHBaseClientKeyValueSize);
return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes,
connection);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index b527ecd042..0380ae8a1c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1182,7 +1182,7 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
implements CompilableStatement {
private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode,
- List<ColumnName> columns, List<ParseNode> values, SelectStatement
select, int bindCount,
+ List<ColumnName> columns, List<List<ParseNode>> values, SelectStatement
select, int bindCount,
Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,
ParseNode>> onDupKeyPairs,
OnDuplicateKeyType onDupKeyType, boolean returningRow) {
super(table, hintNode, columns, values, select, bindCount,
udfParseNodes, onDupKeyPairs,
@@ -2154,7 +2154,7 @@ public class PhoenixStatement implements
PhoenixMonitoredStatement, SQLCloseable
@Override
public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode
hintNode,
- List<ColumnName> columns, List<ParseNode> values, SelectStatement
select, int bindCount,
+ List<ColumnName> columns, List<List<ParseNode>> values, SelectStatement
select, int bindCount,
Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,
ParseNode>> onDupKeyPairs,
UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) {
return new ExecutableUpsertStatement(table, hintNode, columns, values,
select, bindCount,
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 716dbd566a..f3199a01ee 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -912,7 +912,7 @@ public class ParseNodeFactory {
}
public UpsertStatement upsert(NamedTableNode table, HintNode hint,
List<ColumnName> columns,
- List<ParseNode> values, SelectStatement select, int bindCount,
+ List<List<ParseNode>> values, SelectStatement select, int bindCount,
Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>>
onDupKeyPairs,
UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) {
return new UpsertStatement(table, hint, columns, values, select,
bindCount, udfParseNodes,
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index d89b48bdb5..ade24f2aef 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -32,7 +32,7 @@ public class UpsertStatement extends DMLStatement implements
RowReturningDMLStat
}
private final List<ColumnName> columns;
- private final List<ParseNode> values;
+ private final List<List<ParseNode>> values;
private final SelectStatement select;
private final HintNode hint;
private final List<Pair<ColumnName, ParseNode>> onDupKeyPairs;
@@ -40,7 +40,7 @@ public class UpsertStatement extends DMLStatement implements
RowReturningDMLStat
private final boolean returningRow;
public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName>
columns,
- List<ParseNode> values, SelectStatement select, int bindCount,
+ List<List<ParseNode>> values, SelectStatement select, int bindCount,
Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>>
onDupKeyPairs,
OnDuplicateKeyType onDupKeyType, boolean returningRow) {
super(table, bindCount, udfParseNodes);
@@ -57,7 +57,7 @@ public class UpsertStatement extends DMLStatement implements
RowReturningDMLStat
return columns;
}
- public List<ParseNode> getValues() {
+ public List<List<ParseNode>> getValues() {
return values;
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java
new file mode 100644
index 0000000000..e12cc0e922
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsDisabledTest.class)
+public class MultipleUpsertIT extends ParallelStatsDisabledIT {
+ @Test
+ public void testUpsertMultiple() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl =
+ "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY KEY, INT
INTEGER, INT2 INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('A',
11, 12)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + "(K, INT)
VALUES ('B', 2)");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + "(K, INT, INT2) VALUES ('E', 5,
5),('F', 61, 6)");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('C', 31, 32),('D', 41,
42)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('G',
7, 72),('H', 8)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('I',
9),('I', 10)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "
+ tableName);
+ assertTrue(rs.next());
+ assertEquals(9, rs.getInt(1));
+
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + "
ORDER BY K");
+ rs.next();
+ assertEquals(rs.getString(1), "A");
+ assertEquals(rs.getInt(2), 11);
+ assertEquals(rs.getInt(3), 12);
+ rs.next();
+ assertEquals(rs.getString(1), "B");
+ assertEquals(rs.getInt(2), 2);
+ rs.next();
+ assertEquals(rs.getString(1), "C");
+ assertEquals(rs.getInt(2), 31);
+ assertEquals(rs.getInt(3), 32);
+ rs.next();
+ assertEquals(rs.getString(1), "D");
+ assertEquals(rs.getInt(2), 41);
+ assertEquals(rs.getInt(3), 42);
+ rs.next();
+ assertEquals(rs.getString(1), "E");
+ assertEquals(rs.getInt(2), 5);
+ assertEquals(rs.getInt(3), 5);
+ rs.next();
+ assertEquals(rs.getString(1), "F");
+ assertEquals(rs.getInt(2), 61);
+ assertEquals(rs.getInt(3), 6);
+ rs.next();
+ assertEquals(rs.getString(1), "G");
+ assertEquals(rs.getInt(2), 7);
+ assertEquals(rs.getInt(3), 72);
+ rs.next();
+ assertEquals(rs.getString(1), "H");
+ assertEquals(rs.getInt(2), 8);
+ rs.next();
+ assertEquals(rs.getString(1), "I");
+ assertEquals(rs.getInt(2), 10);
+ }
+
+ @Test
+ public void testUpsertMultiple2() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY
KEY, INT INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('A',
1),(SUBSTR('APPLE',0,2), 2*2)");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES (SUBSTR('DELTA',0,1),
5),('C', 2*3)");
+ conn.commit();
+
+ ResultSet rs =
+ conn.createStatement().executeQuery("SELECT * FROM " + tableName + "
ORDER BY K");
+ rs.next();
+ assertEquals(rs.getString(1), "A");
+ assertEquals(rs.getInt(2), 1);
+ rs.next();
+ assertEquals(rs.getString(1), "AP");
+ assertEquals(rs.getInt(2), 4);
+ rs.next();
+ assertEquals(rs.getString(1), "C");
+ assertEquals(rs.getInt(2), 6);
+ rs.next();
+ assertEquals(rs.getString(1), "D");
+ assertEquals(rs.getInt(2), 5);
+ assertFalse(rs.next());
+
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 1b5c8008f7..94deefc798 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -765,6 +765,42 @@ public class QueryParserTest {
parseQuery(sql);
}
+ @Test
+ public void testValidMultipleUpsert() throws Exception {
+ String sql = (("upsert into t VALUES(1,2),(3,4)"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testValidMultipleUpsert2() throws Exception {
+ String sql = "upsert into t(a,b) VALUES(1,2),(3,4)";
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testValidMultipleUpsert3() throws Exception {
+ String sql = (("upsert into t(a,b) VALUES(1,2),(3,4),"));
+ parseQueryThatShouldFail(sql);
+ }
+
+ @Test
+ public void testValidMultipleUpsert4() throws Exception {
+ String sql = (("upsert into t(a,b) VALUES()"));
+ parseQueryThatShouldFail(sql);
+ }
+
+ @Test
+ public void testValidMultipleUpsert5() throws Exception {
+ String sql = (("upsert into t(a,b) VALUES(1,2)(3,4)"));
+ parseQueryThatShouldFail(sql);
+ }
+
+ @Test
+ public void testValidMultipleUpsert6() throws Exception {
+ String sql = (("upsert into t(a,b) VALUES(1,2),(3,4"));
+ parseQueryThatShouldFail(sql);
+ }
+
@Test
public void testDeleteInvalidReturningRow() throws Exception {
String sql = "DELETE FROM T RETURNING PK1";