This is an automated email from the ASF dual-hosted git repository.

richardantal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9006fafce5 PHOENIX-7198 support for multi row constructors in single 
upsert query (#2222)
9006fafce5 is described below

commit 9006fafce5e21861aea70fceeb1e66fba631e3d5
Author: richardantal <[email protected]>
AuthorDate: Tue Feb 10 10:26:46 2026 +0100

    PHOENIX-7198 support for multi row constructors in single upsert query 
(#2222)
---
 phoenix-core-client/src/main/antlr3/PhoenixSQL.g   |   3 +-
 .../org/apache/phoenix/compile/UpsertCompiler.java | 334 +++++++++++----------
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |   4 +-
 .../org/apache/phoenix/parse/ParseNodeFactory.java |   2 +-
 .../org/apache/phoenix/parse/UpsertStatement.java  |   6 +-
 .../apache/phoenix/end2end/MultipleUpsertIT.java   | 123 ++++++++
 .../org/apache/phoenix/parse/QueryParserTest.java  |  36 +++
 7 files changed, 347 insertions(+), 161 deletions(-)

diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
index b905c43f3e..945d981f26 100644
--- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g
@@ -883,9 +883,10 @@ finally{ contextStack.pop(); }
 
 // Parse a full upsert expression structure.
 upsert_node returns [UpsertStatement ret]
+@init{List<List<ParseNode>> v = new ArrayList<List<ParseNode>>(); }
     :   UPSERT (hint=hintClause)? INTO t=from_table_name
         (LPAREN p=upsert_column_refs RPAREN)?
-        ((VALUES LPAREN v=one_or_more_expressions RPAREN (
+        ((VALUES LPAREN e = one_or_more_expressions {v.add(e);} RPAREN (COMMA 
LPAREN e = one_or_more_expressions {v.add(e);} RPAREN )* (
             ON DUPLICATE KEY (
                 ig=IGNORE
               | ( upd=UPDATE pairs=update_column_pairs )
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7d42690420..b9dfbc33e5 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -26,6 +26,7 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
@@ -383,7 +384,7 @@ public class UpsertCompiler {
     int[] columnIndexesToBe;
     int nColumnsToSet = 0;
     int[] pkSlotIndexesToBe;
-    List<ParseNode> valueNodes = upsert.getValues();
+    List<List<ParseNode>> valueNodesList = upsert.getValues();
     List<PColumn> targetColumns;
     NamedTableNode tableNode = upsert.getTable();
     String tableName = tableNode.getName().getTableName();
@@ -557,8 +558,12 @@ public class UpsertCompiler {
         columnsBeingSet.set(columnIndexesToBe[i] = 
rowTimestampCol.getPosition());
         pkSlotIndexesToBe[i] = table.getRowTimestampColPos();
         targetColumns.add(rowTimestampCol);
-        if (valueNodes != null && !valueNodes.isEmpty()) {
-          valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol));
+        if (valueNodesList != null) {
+          for (List<ParseNode> parseNode : valueNodesList) {
+            if (!parseNode.isEmpty()) {
+              parseNode.add(getNodeForRowTimestampColumn(rowTimestampCol));
+            }
+          }
         }
         nColumnsToSet++;
       }
@@ -571,7 +576,7 @@ public class UpsertCompiler {
       }
     }
     boolean isAutoCommit = connection.getAutoCommit();
-    if (valueNodes == null) {
+    if (valueNodesList.isEmpty()) {
       SelectStatement select = upsert.getSelect();
       assert (select != null);
       select = SubselectRewriter.flatten(select, connection);
@@ -675,8 +680,8 @@ public class UpsertCompiler {
       // Cannot auto commit if doing aggregation or topN or salted
       // Salted causes problems because the row may end up living on a 
different region
     } else {
-      nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + 
(isTenantSpecific ? 1 : 0)
-        + (isSharedViewIndex ? 1 : 0);
+      nValuesToSet = valueNodesList.get(0).size() + addViewColumnsToBe.size()
+        + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
     }
     // Resize down to allow a subset of columns to be specifiable
     if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
@@ -703,7 +708,7 @@ public class UpsertCompiler {
     final QueryPlan originalQueryPlan = queryPlanToBe;
     RowProjector projectorToBe = null;
     // Optimize only after all checks have been performed
-    if (valueNodes == null) {
+    if (valueNodesList.isEmpty()) {
       queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, 
statement, targetColumns,
         parallelIteratorFactoryToBe);
       projectorToBe = queryPlanToBe.getProjector();
@@ -726,7 +731,7 @@ public class UpsertCompiler {
     ////////////////////////////////////////////////////////////////////
     // UPSERT SELECT
     /////////////////////////////////////////////////////////////////////
-    if (valueNodes == null) {
+    if (valueNodesList.isEmpty()) {
       // Before we re-order, check that for updatable view columns
       // the projected expression either matches the column name or
       // is a constant with the same required value.
@@ -851,101 +856,113 @@ public class UpsertCompiler {
     ////////////////////////////////////////////////////////////////////
     // UPSERT VALUES
     /////////////////////////////////////////////////////////////////////
-    final byte[][] values = new byte[nValuesToSet][];
-    int nodeIndex = 0;
-    if (isSharedViewIndex) {
-      values[nodeIndex++] = 
table.getviewIndexIdType().toBytes(table.getViewIndexId());
-    }
-    if (isTenantSpecific) {
-      PName tenantId = connection.getTenantId();
-      values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(),
-        table.getBucketNum() != null, tenantId, isSharedViewIndex);
-    }
-
-    final int nodeIndexOffset = nodeIndex;
-    // Allocate array based on size of all columns in table,
-    // since some values may not be set (if they're nullable).
-    final StatementContext context =
+    StatementContext context =
       new StatementContext(statement, resolver, new Scan(), new 
SequenceManager(statement));
-    UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context);
-    final List<Expression> constantExpressions =
-      Lists.newArrayListWithExpectedSize(valueNodes.size());
-    // First build all the expressions, as with sequences we want to collect 
them all first
-    // and initialize them in one batch
-    List<Pair<ColumnName, ParseNode>> jsonExpressions = Lists.newArrayList();
-    List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
-    for (ParseNode valueNode : valueNodes) {
-      if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
-        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build()
-          .buildException();
-      }
-      PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-      expressionBuilder.setColumn(column);
-      Expression expression = valueNode.accept(expressionBuilder);
-      if (
-        expression.getDataType() != null
-          && !expression.getDataType().isCastableTo(column.getDataType())
-      ) {
-        throw TypeMismatchException.newException(expression.getDataType(), 
column.getDataType(),
-          "expression: " + expression.toString() + " in column " + column);
+    List<byte[][]> valuesList = new ArrayList<>();
+    int nodeIndexOffset = 0;
+    List<List<Expression>> constantExpressionsList = new ArrayList<>();
+
+    byte[] onDupKeyBytes = null;
+    OnDuplicateKeyType onDupKeyType = null;
+
+    for (List<ParseNode> valueNodesItem : valueNodesList) {
+      byte[][] values = new byte[nValuesToSet][];
+      int nodeIndex = 0;
+      if (isSharedViewIndex) {
+        values[nodeIndex++] = 
table.getviewIndexIdType().toBytes(table.getViewIndexId());
       }
-      if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) {
-        nonPKColumns
-          .add(new 
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
-            column.getName().getString()), valueNode));
-      } else if (valueNode.hasJsonExpression()) {
-        jsonExpressions
-          .add(new 
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
-            column.getName().getString()), valueNode));
+      if (isTenantSpecific) {
+        PName tenantId = connection.getTenantId();
+        values[nodeIndex++] = 
ScanUtil.getTenantIdBytes(table.getRowKeySchema(),
+          table.getBucketNum() != null, tenantId, isSharedViewIndex);
       }
-      constantExpressions.add(expression);
-      nodeIndex++;
-    }
-    if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
-      jsonExpressions.addAll(nonPKColumns);
-      nonPKColumns.clear();
-    }
-    byte[] onDupKeyBytesToBe = null;
-    List<Pair<ColumnName, ParseNode>> onDupKeyPairs = 
upsert.getOnDupKeyPairs();
-    OnDuplicateKeyType onDupKeyType = upsert.getOnDupKeyType();
 
-    if (onDupKeyPairs != null) {
-      if (table.isImmutableRows()) {
-        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
-          .setSchemaName(table.getSchemaName().getString())
-          
.setTableName(table.getTableName().getString()).build().buildException();
-      }
-      if (table.isTransactional()) {
-        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
-          .setSchemaName(table.getSchemaName().getString())
-          
.setTableName(table.getTableName().getString()).build().buildException();
+      nodeIndexOffset = nodeIndex;
+      // Allocate array based on size of all columns in table,
+      // since some values may not be set (if they're nullable).
+      UpsertValuesCompiler expressionBuilder = new 
UpsertValuesCompiler(context);
+      List<Expression> constantExpressions =
+        Lists.newArrayListWithExpectedSize(valueNodesItem.size());
+      // First build all the expressions, as with sequences we want to collect 
them all first
+      // and initialize them in one batch
+      List<Pair<ColumnName, ParseNode>> jsonExpressions = Lists.newArrayList();
+      List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
+      for (ParseNode valueNode : valueNodesItem) {
+        if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
+          throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build()
+            .buildException();
+        }
+        PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+        expressionBuilder.setColumn(column);
+        Expression expression = valueNode.accept(expressionBuilder);
+        if (
+          expression.getDataType() != null
+            && !expression.getDataType().isCastableTo(column.getDataType())
+        ) {
+          throw TypeMismatchException.newException(expression.getDataType(), 
column.getDataType(),
+            "expression: " + expression.toString() + " in column " + column);
+        }
+        if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) {
+          nonPKColumns
+            .add(new 
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+              column.getName().getString()), valueNode));
+        } else if (valueNode.hasJsonExpression()) {
+          jsonExpressions
+            .add(new 
Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
+              column.getName().getString()), valueNode));
+        }
+        constantExpressions.add(expression);
+        nodeIndex++;
       }
-      if (connection.getSCN() != null) {
-        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
-          .setSchemaName(table.getSchemaName().getString())
-          
.setTableName(table.getTableName().getString()).build().buildException();
+      if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
+        jsonExpressions.addAll(nonPKColumns);
+        nonPKColumns.clear();
       }
-
-      switch (onDupKeyType) {
-        case IGNORE: {
-          onDupKeyBytesToBe = 
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
-          break;
+      byte[] onDupKeyBytesToBe = null;
+      List<Pair<ColumnName, ParseNode>> onDupKeyPairs = 
upsert.getOnDupKeyPairs();
+      onDupKeyType = upsert.getOnDupKeyType();
+
+      if (onDupKeyPairs != null) {
+        if (table.isImmutableRows()) {
+          throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
+            .setSchemaName(table.getSchemaName().getString())
+            
.setTableName(table.getTableName().getString()).build().buildException();
         }
-        case UPDATE:
-        case UPDATE_ONLY: {
-          onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
onDupKeyPairs, resolver);
-          break;
+        if (table.isTransactional()) {
+          throw new SQLExceptionInfo.Builder(
+            SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
+              .setSchemaName(table.getSchemaName().getString())
+              
.setTableName(table.getTableName().getString()).build().buildException();
         }
-        default:
-          break;
+        if (connection.getSCN() != null) {
+          throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
+            .setSchemaName(table.getSchemaName().getString())
+            
.setTableName(table.getTableName().getString()).build().buildException();
+        }
+
+        switch (onDupKeyType) {
+          case IGNORE: {
+            onDupKeyBytesToBe = 
PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
+            break;
+          }
+          case UPDATE:
+          case UPDATE_ONLY: {
+            onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
onDupKeyPairs, resolver);
+            break;
+          }
+          default:
+            break;
+        }
+      } else if (!jsonExpressions.isEmpty()) {
+        onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
jsonExpressions, resolver);
       }
-    } else if (!jsonExpressions.isEmpty()) {
-      onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, 
jsonExpressions, resolver);
+      onDupKeyBytes = onDupKeyBytesToBe;
+      valuesList.add(values);
+      constantExpressionsList.add(constantExpressions);
     }
-    final byte[] onDupKeyBytes = onDupKeyBytesToBe;
 
-    return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, 
constantExpressions,
-      allColumns, columnIndexes, overlapViewColumns, values, addViewColumns, 
connection,
+    return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, 
constantExpressionsList,
+      allColumns, columnIndexes, overlapViewColumns, valuesList, 
addViewColumns, connection,
       pkSlotIndexes, useServerTimestamp, onDupKeyBytes, onDupKeyType, maxSize, 
maxSizeBytes);
   }
 
@@ -1252,11 +1269,11 @@ public class UpsertCompiler {
     private final StatementContext context;
     private final TableRef tableRef;
     private final int nodeIndexOffset;
-    private final List<Expression> constantExpressions;
+    private final List<List<Expression>> constantExpressionsList;
     private final List<PColumn> allColumns;
     private final int[] columnIndexes;
     private final Set<PColumn> overlapViewColumns;
-    private final byte[][] values;
+    private final List<byte[][]> valuesList;
     private final Set<PColumn> addViewColumns;
     private final PhoenixConnection connection;
     private final int[] pkSlotIndexes;
@@ -1267,19 +1284,19 @@ public class UpsertCompiler {
     private final long maxSizeBytes;
 
     public UpsertValuesMutationPlan(StatementContext context, TableRef 
tableRef,
-      int nodeIndexOffset, List<Expression> constantExpressions, List<PColumn> 
allColumns,
-      int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
+      int nodeIndexOffset, List<List<Expression>> constantExpressionsList, 
List<PColumn> allColumns,
+      int[] columnIndexes, Set<PColumn> overlapViewColumns, List<byte[][]> 
valuesList,
       Set<PColumn> addViewColumns, PhoenixConnection connection, int[] 
pkSlotIndexes,
       boolean useServerTimestamp, byte[] onDupKeyBytes, OnDuplicateKeyType 
onDupKeyType,
       int maxSize, long maxSizeBytes) {
       this.context = context;
       this.tableRef = tableRef;
       this.nodeIndexOffset = nodeIndexOffset;
-      this.constantExpressions = constantExpressions;
+      this.constantExpressionsList = constantExpressionsList;
       this.allColumns = allColumns;
       this.columnIndexes = columnIndexes;
       this.overlapViewColumns = overlapViewColumns;
-      this.values = values;
+      this.valuesList = valuesList;
       this.addViewColumns = addViewColumns;
       this.connection = connection;
       this.pkSlotIndexes = pkSlotIndexes;
@@ -1325,74 +1342,83 @@ public class UpsertCompiler {
       ImmutableBytesWritable ptr = context.getTempPtr();
       final SequenceManager sequenceManager = context.getSequenceManager();
       // Next evaluate all the expressions
-      int nodeIndex = nodeIndexOffset;
       PTable table = tableRef.getTable();
       Tuple tuple =
         sequenceManager.getSequenceCount() == 0 ? null : 
sequenceManager.newSequenceTuple(null);
-      for (Expression constantExpression : constantExpressions) {
-        if (!constantExpression.isStateless()) {
-          nodeIndex++;
-          continue;
-        }
-        PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-        constantExpression.evaluate(tuple, ptr);
-        Object value = null;
-        if (constantExpression.getDataType() != null) {
-          value = constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(),
-            constantExpression.getMaxLength(), constantExpression.getScale());
-          if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
-            throw 
TypeMismatchException.newException(constantExpression.getDataType(),
-              column.getDataType(),
-              "expression: " + constantExpression.toString() + " in column " + 
column);
+      for (int index = 0; index < valuesList.size(); index++) {
+        byte[][] valuesListItem = valuesList.get(index);
+        int nodeIndex = nodeIndexOffset;
+        for (Expression constantExpression : 
constantExpressionsList.get(index)) {
+          if (!constantExpression.isStateless()) {
+            nodeIndex++;
+            continue;
+          }
+          PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+          constantExpression.evaluate(tuple, ptr);
+          Object value = null;
+          if (constantExpression.getDataType() != null) {
+            value =
+              constantExpression.getDataType().toObject(ptr, 
constantExpression.getSortOrder(),
+                constantExpression.getMaxLength(), 
constantExpression.getScale());
+            if 
(!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+              throw 
TypeMismatchException.newException(constantExpression.getDataType(),
+                column.getDataType(),
+                "expression: " + constantExpression.toString() + " in column " 
+ column);
+            }
+            if (
+              !column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
+                constantExpression.getSortOrder(), 
constantExpression.getMaxLength(),
+                constantExpression.getScale(), column.getMaxLength(), 
column.getScale())
+            ) {
+              throw new DataExceedsCapacityException(column.getDataType(), 
column.getMaxLength(),
+                column.getScale(), column.getName().getString());
+            }
           }
+          column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(),
+            constantExpression.getMaxLength(), constantExpression.getScale(),
+            constantExpression.getSortOrder(), column.getMaxLength(), 
column.getScale(),
+            column.getSortOrder(), table.rowKeyOrderOptimizable());
           if (
-            !column.getDataType().isSizeCompatible(ptr, value, 
constantExpression.getDataType(),
-              constantExpression.getSortOrder(), 
constantExpression.getMaxLength(),
-              constantExpression.getScale(), column.getMaxLength(), 
column.getScale())
+            overlapViewColumns.contains(column)
+              && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(),
+                column.getViewConstant(), 0, column.getViewConstant().length - 
1) != 0
           ) {
-            throw new DataExceedsCapacityException(column.getDataType(), 
column.getMaxLength(),
-              column.getScale(), column.getName().getString());
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+              .setColumnName(column.getName().getString())
+              .setMessage("value=" + 
constantExpression.toString()).build().buildException();
           }
+          valuesListItem[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+          nodeIndex++;
         }
-        column.getDataType().coerceBytes(ptr, value, 
constantExpression.getDataType(),
-          constantExpression.getMaxLength(), constantExpression.getScale(),
-          constantExpression.getSortOrder(), column.getMaxLength(), 
column.getScale(),
-          column.getSortOrder(), table.rowKeyOrderOptimizable());
-        if (
-          overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), 
ptr.getOffset(),
-            ptr.getLength(), column.getViewConstant(), 0, 
column.getViewConstant().length - 1) != 0
-        ) {
-          throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
-            .setColumnName(column.getName().getString())
-            .setMessage("value=" + 
constantExpression.toString()).build().buildException();
+        // Add columns based on view
+        for (PColumn column : addViewColumns) {
+          if (IndexUtil.getViewConstantValue(column, ptr)) {
+            valuesListItem[nodeIndex++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
+          } else {
+            throw new IllegalStateException();
+          }
         }
-        values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-        nodeIndex++;
       }
-      // Add columns based on view
-      for (PColumn column : addViewColumns) {
-        if (IndexUtil.getViewConstantValue(column, ptr)) {
-          values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-        } else {
-          throw new IllegalStateException();
+
+      MultiRowMutationState mutation = new 
MultiRowMutationState(valuesList.size());
+      for (byte[][] valuesListItems : valuesList) {
+        IndexMaintainer indexMaintainer = null;
+        byte[][] viewConstants = null;
+        if (table.getIndexType() == IndexType.LOCAL) {
+          PTable parentTable = statement.getConnection().getMetaDataCache()
+            .getTableRef(new PTableKey(statement.getConnection().getTenantId(),
+              table.getParentName().getString()))
+            .getTable();
+          indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+          viewConstants = IndexUtil.getViewConstants(parentTable);
         }
+        int maxHBaseClientKeyValueSize = 
statement.getConnection().getQueryServices().getProps()
+          .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
+            QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
+        setValues(valuesListItems, pkSlotIndexes, columnIndexes, table, 
mutation, statement,
+          useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 
onDupKeyType, 0,
+          maxHBaseClientKeyValueSize);
       }
-      MultiRowMutationState mutation = new MultiRowMutationState(1);
-      IndexMaintainer indexMaintainer = null;
-      byte[][] viewConstants = null;
-      if (table.getIndexType() == IndexType.LOCAL) {
-        PTable parentTable = 
statement.getConnection().getMetaDataCache().getTableRef(
-          new PTableKey(statement.getConnection().getTenantId(), 
table.getParentName().getString()))
-          .getTable();
-        indexMaintainer = table.getIndexMaintainer(parentTable, connection);
-        viewConstants = IndexUtil.getViewConstants(parentTable);
-      }
-      int maxHBaseClientKeyValueSize = 
statement.getConnection().getQueryServices().getProps()
-        .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE,
-          QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE);
-      setValues(values, pkSlotIndexes, columnIndexes, table, mutation, 
statement,
-        useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 
onDupKeyType, 0,
-        maxHBaseClientKeyValueSize);
       return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, 
connection);
     }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index b527ecd042..0380ae8a1c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1182,7 +1182,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
     implements CompilableStatement {
 
     private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode,
-      List<ColumnName> columns, List<ParseNode> values, SelectStatement 
select, int bindCount,
+      List<ColumnName> columns, List<List<ParseNode>> values, SelectStatement 
select, int bindCount,
       Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, 
ParseNode>> onDupKeyPairs,
       OnDuplicateKeyType onDupKeyType, boolean returningRow) {
       super(table, hintNode, columns, values, select, bindCount, 
udfParseNodes, onDupKeyPairs,
@@ -2154,7 +2154,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
 
     @Override
     public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode 
hintNode,
-      List<ColumnName> columns, List<ParseNode> values, SelectStatement 
select, int bindCount,
+      List<ColumnName> columns, List<List<ParseNode>> values, SelectStatement 
select, int bindCount,
       Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, 
ParseNode>> onDupKeyPairs,
       UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) {
       return new ExecutableUpsertStatement(table, hintNode, columns, values, 
select, bindCount,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 716dbd566a..f3199a01ee 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -912,7 +912,7 @@ public class ParseNodeFactory {
   }
 
   public UpsertStatement upsert(NamedTableNode table, HintNode hint, 
List<ColumnName> columns,
-    List<ParseNode> values, SelectStatement select, int bindCount,
+    List<List<ParseNode>> values, SelectStatement select, int bindCount,
     Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> 
onDupKeyPairs,
     UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) {
     return new UpsertStatement(table, hint, columns, values, select, 
bindCount, udfParseNodes,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
index d89b48bdb5..ade24f2aef 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java
@@ -32,7 +32,7 @@ public class UpsertStatement extends DMLStatement implements 
RowReturningDMLStat
   }
 
   private final List<ColumnName> columns;
-  private final List<ParseNode> values;
+  private final List<List<ParseNode>> values;
   private final SelectStatement select;
   private final HintNode hint;
   private final List<Pair<ColumnName, ParseNode>> onDupKeyPairs;
@@ -40,7 +40,7 @@ public class UpsertStatement extends DMLStatement implements 
RowReturningDMLStat
   private final boolean returningRow;
 
   public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> 
columns,
-    List<ParseNode> values, SelectStatement select, int bindCount,
+    List<List<ParseNode>> values, SelectStatement select, int bindCount,
     Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> 
onDupKeyPairs,
     OnDuplicateKeyType onDupKeyType, boolean returningRow) {
     super(table, bindCount, udfParseNodes);
@@ -57,7 +57,7 @@ public class UpsertStatement extends DMLStatement implements 
RowReturningDMLStat
     return columns;
   }
 
-  public List<ParseNode> getValues() {
+  public List<List<ParseNode>> getValues() {
     return values;
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java
new file mode 100644
index 0000000000..e12cc0e922
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsDisabledTest.class)
+public class MultipleUpsertIT extends ParallelStatsDisabledIT {
+  @Test
+  public void testUpsertMultiple() throws Exception {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String tableName = generateUniqueName();
+    String ddl =
+      "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY KEY, INT 
INTEGER, INT2 INTEGER)";
+    conn.createStatement().execute(ddl);
+
+    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('A', 
11, 12)");
+    conn.createStatement().execute("UPSERT INTO " + tableName + "(K, INT) 
VALUES ('B', 2)");
+    conn.createStatement()
+      .execute("UPSERT INTO " + tableName + "(K, INT, INT2) VALUES ('E', 5, 
5),('F', 61, 6)");
+    conn.createStatement()
+      .execute("UPSERT INTO " + tableName + " VALUES ('C', 31, 32),('D', 41, 
42)");
+    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('G', 
7, 72),('H', 8)");
+    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('I', 
9),('I', 10)");
+    conn.commit();
+
+    ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " 
+ tableName);
+    assertTrue(rs.next());
+    assertEquals(9, rs.getInt(1));
+
+    rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " 
ORDER BY K");
+    rs.next();
+    assertEquals(rs.getString(1), "A");
+    assertEquals(rs.getInt(2), 11);
+    assertEquals(rs.getInt(3), 12);
+    rs.next();
+    assertEquals(rs.getString(1), "B");
+    assertEquals(rs.getInt(2), 2);
+    rs.next();
+    assertEquals(rs.getString(1), "C");
+    assertEquals(rs.getInt(2), 31);
+    assertEquals(rs.getInt(3), 32);
+    rs.next();
+    assertEquals(rs.getString(1), "D");
+    assertEquals(rs.getInt(2), 41);
+    assertEquals(rs.getInt(3), 42);
+    rs.next();
+    assertEquals(rs.getString(1), "E");
+    assertEquals(rs.getInt(2), 5);
+    assertEquals(rs.getInt(3), 5);
+    rs.next();
+    assertEquals(rs.getString(1), "F");
+    assertEquals(rs.getInt(2), 61);
+    assertEquals(rs.getInt(3), 6);
+    rs.next();
+    assertEquals(rs.getString(1), "G");
+    assertEquals(rs.getInt(2), 7);
+    assertEquals(rs.getInt(3), 72);
+    rs.next();
+    assertEquals(rs.getString(1), "H");
+    assertEquals(rs.getInt(2), 8);
+    rs.next();
+    assertEquals(rs.getString(1), "I");
+    assertEquals(rs.getInt(2), 10);
+  }
+
+  @Test
+  public void testUpsertMultiple2() throws Exception {
+    Properties props = new Properties();
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    String tableName = generateUniqueName();
+    String ddl = "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY 
KEY, INT INTEGER)";
+    conn.createStatement().execute(ddl);
+
+    conn.createStatement()
+      .execute("UPSERT INTO " + tableName + " VALUES ('A', 
1),(SUBSTR('APPLE',0,2), 2*2)");
+    conn.createStatement()
+      .execute("UPSERT INTO " + tableName + " VALUES (SUBSTR('DELTA',0,1), 
5),('C', 2*3)");
+    conn.commit();
+
+    ResultSet rs =
+      conn.createStatement().executeQuery("SELECT * FROM " + tableName + " 
ORDER BY K");
+    rs.next();
+    assertEquals(rs.getString(1), "A");
+    assertEquals(rs.getInt(2), 1);
+    rs.next();
+    assertEquals(rs.getString(1), "AP");
+    assertEquals(rs.getInt(2), 4);
+    rs.next();
+    assertEquals(rs.getString(1), "C");
+    assertEquals(rs.getInt(2), 6);
+    rs.next();
+    assertEquals(rs.getString(1), "D");
+    assertEquals(rs.getInt(2), 5);
+    assertFalse(rs.next());
+
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 1b5c8008f7..94deefc798 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -765,6 +765,42 @@ public class QueryParserTest {
     parseQuery(sql);
   }
 
+  @Test
+  public void testValidMultipleUpsert() throws Exception {
+    String sql = (("upsert into t VALUES(1,2),(3,4)"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testValidMultipleUpsert2() throws Exception {
+    String sql = "upsert into t(a,b) VALUES(1,2),(3,4)";
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testValidMultipleUpsert3() throws Exception {
+    String sql = (("upsert into t(a,b) VALUES(1,2),(3,4),"));
+    parseQueryThatShouldFail(sql);
+  }
+
+  @Test
+  public void testValidMultipleUpsert4() throws Exception {
+    String sql = (("upsert into t(a,b) VALUES()"));
+    parseQueryThatShouldFail(sql);
+  }
+
+  @Test
+  public void testValidMultipleUpsert5() throws Exception {
+    String sql = (("upsert into t(a,b) VALUES(1,2)(3,4)"));
+    parseQueryThatShouldFail(sql);
+  }
+
+  @Test
+  public void testValidMultipleUpsert6() throws Exception {
+    String sql = (("upsert into t(a,b) VALUES(1,2),(3,4"));
+    parseQueryThatShouldFail(sql);
+  }
+
   @Test
   public void testDeleteInvalidReturningRow() throws Exception {
     String sql = "DELETE FROM T RETURNING PK1";


Reply via email to