This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a72868066 Use byte[] directly in QueryOptions instead of ByteBuffer 
and convert them to ArrayCell instead of BufferCell.
8a72868066 is described below

commit 8a7286806620928edcff733e2f6d6d7b0e663619
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Dec 29 12:05:48 2024 +0000

    Use byte[] directly in QueryOptions instead of ByteBuffer and convert them 
to ArrayCell instead of BufferCell.
    
    Additionally replace List with array for bind values (we know the size in 
advance during a decoding), so in total: List<List> is replaced with byte[][] 
QueryOptions classes support both ways to get values now: using an old API with 
ByteBuffer and a new API with byte[].
    
    Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for 
CASSANDRA-20166
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/audit/AuditLogManager.java    |   5 +-
 .../apache/cassandra/cql3/BatchQueryOptions.java   |  62 +++++++++---
 .../org/apache/cassandra/cql3/QueryEvents.java     |  17 +++-
 .../org/apache/cassandra/cql3/QueryOptions.java    | 108 +++++++++++++++++++--
 .../org/apache/cassandra/cql3/QueryProcessor.java  |  12 +--
 .../apache/cassandra/cql3/UpdateParameters.java    |  46 ++++++---
 .../org/apache/cassandra/cql3/terms/Constants.java |  22 ++++-
 .../org/apache/cassandra/cql3/terms/Marker.java    |  38 +++++++-
 src/java/org/apache/cassandra/cql3/terms/Term.java |  18 ++++
 .../org/apache/cassandra/db/marshal/ListType.java  |  12 +++
 .../org/apache/cassandra/db/marshal/MapType.java   |  28 ++++--
 .../cassandra/db/marshal/MultiElementType.java     |   2 +
 .../org/apache/cassandra/db/marshal/SetType.java   |  18 +++-
 .../org/apache/cassandra/db/marshal/TupleType.java |  18 +++-
 .../org/apache/cassandra/db/marshal/UserType.java  |  18 +++-
 .../apache/cassandra/db/marshal/VectorType.java    |  20 +++-
 .../org/apache/cassandra/db/rows/ArrayCell.java    |  12 +++
 src/java/org/apache/cassandra/db/rows/Cell.java    |   5 +
 .../org/apache/cassandra/db/rows/NativeCell.java   |  61 ++++++++----
 .../org/apache/cassandra/fql/FullQueryLogger.java  |  21 ++--
 .../org/apache/cassandra/transport/CBUtil.java     |  61 ++++++++++++
 .../cassandra/transport/messages/BatchMessage.java |  21 ++--
 .../org/apache/cassandra/utils/ByteArrayUtil.java  |  14 +++
 .../cassandra/utils/memory/NativeAllocator.java    |   4 +-
 .../test/microbench/BatchStatementBench.java       |  10 +-
 .../org/apache/cassandra/cql3/QueryEventsTest.java |  31 +++---
 .../apache/cassandra/fql/FullQueryLoggerTest.java  |  25 +++--
 .../ClientRequestRowAndColumnMetricsTest.java      |   8 +-
 .../cassandra/transport/MessagePayloadTest.java    |   7 +-
 .../src/org/apache/cassandra/fqltool/FQLQuery.java |   8 +-
 .../apache/cassandra/fqltool/FQLReplayTest.java    |   5 +-
 32 files changed, 578 insertions(+), 160 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2391d10537..9cac450533 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Use byte[] directly in QueryOptions instead of ByteBuffer and convert them 
to ArrayCell instead of BufferCell to reduce allocations (CASSANDRA-20166)
  * Log queries scanning too many SSTables per read (CASSANDRA-21048)
  * Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
  * Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java 
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index f0e44f6f84..c453dcbb80 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -22,7 +22,6 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.nio.ByteBuffer;
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
@@ -317,7 +316,7 @@ public class AuditLogManager implements 
QueryEvents.Listener, AuthEvents.Listene
     public void batchSuccess(BatchStatement.Type batchType,
                              List<? extends CQLStatement> statements,
                              List<String> queries,
-                             List<List<ByteBuffer>> values,
+                             List<byte[][]> values,
                              QueryOptions options,
                              QueryState state,
                              long queryStartTimeMillis,
@@ -358,7 +357,7 @@ public class AuditLogManager implements 
QueryEvents.Listener, AuthEvents.Listene
         }
     }
 
-    public void batchFailure(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values, 
QueryOptions options, QueryState state, Exception cause)
+    public void batchFailure(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<byte[][]> values, 
QueryOptions options, QueryState state, Exception cause)
     {
         String auditMessage = String.format("BATCH of %d statements at 
consistency %s", statements.size(), options.getConsistency());
         AuditLogEntry entry = new 
AuditLogEntry.Builder(state).setOperation(auditMessage)
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java 
b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index dc38c4d0ce..1e9ecc87e0 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
+import static 
org.apache.cassandra.utils.ByteArrayUtil.convertToByteBufferValue;
+
 public abstract class BatchQueryOptions
 {
     public static BatchQueryOptions DEFAULT = 
withoutPerStatementVariables(QueryOptions.DEFAULT);
@@ -49,7 +51,7 @@ public abstract class BatchQueryOptions
         return new WithoutPerStatementVariables(options, 
Collections.<Object>emptyList());
     }
 
-    public static BatchQueryOptions withPerStatementVariables(QueryOptions 
options, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+    public static BatchQueryOptions withPerStatementVariables(QueryOptions 
options, List<byte[][]> variables, List<Object> queryOrIdList)
     {
         return new WithPerStatementVariables(options, variables, 
queryOrIdList);
     }
@@ -91,6 +93,50 @@ public abstract class BatchQueryOptions
         return wrapped.getNowInSeconds(state);
     }
 
+    private static class BatchQueryOptionsWrapper extends 
QueryOptions.QueryOptionsWrapper {
+        private final byte[][] valuesAsByteArray;
+        private List<ByteBuffer> values; // initialized on demand
+
+        BatchQueryOptionsWrapper(QueryOptions wrapped, byte[][] vars)
+        {
+            super(wrapped);
+            this.valuesAsByteArray = vars;
+        }
+        public List<ByteBuffer> getValues()
+        {
+            if (values == null)
+            {
+                values = new ArrayList<>(valuesAsByteArray.length);
+                for (byte[] byteArrayValue : valuesAsByteArray)
+                    values.add(convertToByteBufferValue(byteArrayValue));
+            }
+            return values;
+        }
+
+        public int getValuesSize()
+        {
+            return valuesAsByteArray.length;
+        }
+
+        public ByteBuffer getValue(int index)
+        {
+            if (values == null) // we convert values to ByteBuffer in a lazy 
way, on demand
+                return convertToByteBufferValue(valuesAsByteArray[index]);
+            else
+                return values.get(index);
+        }
+
+        public boolean isByteArrayValuesGetSupported()
+        {
+            return true;
+        }
+
+        public byte[][] getByteArrayValues()
+        {
+            return valuesAsByteArray;
+        }
+    }
+
     private static class WithoutPerStatementVariables extends BatchQueryOptions
     {
         private WithoutPerStatementVariables(QueryOptions wrapped, 
List<Object> queryOrIdList)
@@ -108,20 +154,12 @@ public abstract class BatchQueryOptions
     {
         private final List<QueryOptions> perStatementOptions;
 
-        private WithPerStatementVariables(QueryOptions wrapped, 
List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+        private WithPerStatementVariables(QueryOptions wrapped, List<byte[][]> 
variables, List<Object> queryOrIdList)
         {
             super(wrapped, queryOrIdList);
             this.perStatementOptions = new ArrayList<>(variables.size());
-            for (final List<ByteBuffer> vars : variables)
-            {
-                perStatementOptions.add(new 
QueryOptions.QueryOptionsWrapper(wrapped)
-                {
-                    public List<ByteBuffer> getValues()
-                    {
-                        return vars;
-                    }
-                });
-            }
+            for (final byte[][] vars : variables)
+                perStatementOptions.add(new BatchQueryOptionsWrapper(wrapped, 
vars));
         }
 
         public QueryOptions forStatement(int i)
diff --git a/src/java/org/apache/cassandra/cql3/QueryEvents.java 
b/src/java/org/apache/cassandra/cql3/QueryEvents.java
index 52414ebb74..7b341ed200 100644
--- a/src/java/org/apache/cassandra/cql3/QueryEvents.java
+++ b/src/java/org/apache/cassandra/cql3/QueryEvents.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.cql3;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -143,10 +142,14 @@ public class QueryEvents
         }
     }
 
+    /**
+     * Note: we use {@code byte[][]} for values of batch statement elements 
instead of {@code List<ByteBuffer>} here
+     * to reduce memory allocation when we decode and process a batch statement
+     */
     public void notifyBatchSuccess(BatchStatement.Type batchType,
                                    List<? extends CQLStatement> statements,
                                    List<String> queries,
-                                   List<List<ByteBuffer>> values,
+                                   List<byte[][]> values,
                                    QueryOptions options,
                                    QueryState state,
                                    long queryTime,
@@ -164,10 +167,14 @@ public class QueryEvents
         }
     }
 
+    /**
+     * Note: we use {@code byte[][]} for values of batch statement elements 
instead of {@code List<ByteBuffer>} here
+     * to reduce memory allocation when we decode and process a batch statement
+     */
     public void notifyBatchFailure(List<QueryHandler.Prepared> prepared,
                                    BatchStatement.Type batchType,
                                    List<Object> queryOrIdList,
-                                   List<List<ByteBuffer>> values,
+                                   List<byte[][]> values,
                                    QueryOptions options,
                                    QueryState state,
                                    Exception cause)
@@ -288,7 +295,7 @@ public class QueryEvents
         default void batchSuccess(BatchStatement.Type batchType,
                                   List<? extends CQLStatement> statements,
                                   List<String> queries,
-                                  List<List<ByteBuffer>> values,
+                                  List<byte[][]> values,
                                   QueryOptions options,
                                   QueryState state,
                                   long queryTime,
@@ -296,7 +303,7 @@ public class QueryEvents
         default void batchFailure(BatchStatement.Type batchType,
                                   List<? extends CQLStatement> statements,
                                   List<String> queries,
-                                  List<List<ByteBuffer>> values,
+                                  List<byte[][]> values,
                                   QueryOptions options,
                                   QueryState state,
                                   Exception cause) {}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index c4e6e33b25..fae5072460 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -37,11 +37,14 @@ import org.apache.cassandra.transport.CBCodec;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.CassandraUInt;
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
+import static 
org.apache.cassandra.utils.ByteArrayUtil.convertToByteBufferValue;
+
 /**
  * Options for a query.
  */
@@ -49,6 +52,7 @@ public abstract class QueryOptions
 {
     public static final QueryOptions DEFAULT = new 
DefaultQueryOptions(ConsistencyLevel.ONE,
                                                                        
Collections.emptyList(),
+                                                                       
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
                                                                        false,
                                                                        
SpecificOptions.DEFAULT,
                                                                        
ProtocolVersion.CURRENT);
@@ -62,22 +66,22 @@ public abstract class QueryOptions
 
     public static QueryOptions forInternalCalls(ConsistencyLevel consistency, 
List<ByteBuffer> values)
     {
-        return new DefaultQueryOptions(consistency, values, false, 
SpecificOptions.DEFAULT, ProtocolVersion.V3);
+        return new DefaultQueryOptions(consistency, values, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT, 
ProtocolVersion.V3);
     }
 
     public static QueryOptions forInternalCallsWithNowInSec(long nowInSec, 
ConsistencyLevel consistency, List<ByteBuffer> values)
     {
-        return new DefaultQueryOptions(consistency, values, false, 
SpecificOptions.DEFAULT.withNowInSec(nowInSec), ProtocolVersion.CURRENT);
+        return new DefaultQueryOptions(consistency, values, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, 
SpecificOptions.DEFAULT.withNowInSec(nowInSec), ProtocolVersion.CURRENT);
     }
 
     public static QueryOptions forInternalCalls(List<ByteBuffer> values)
     {
-        return new DefaultQueryOptions(ConsistencyLevel.ONE, values, false, 
SpecificOptions.DEFAULT, ProtocolVersion.V3);
+        return new DefaultQueryOptions(ConsistencyLevel.ONE, values, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT, 
ProtocolVersion.V3);
     }
 
     public static QueryOptions forProtocolVersion(ProtocolVersion 
protocolVersion)
     {
-        return new DefaultQueryOptions(null, null, true, null, 
protocolVersion);
+        return new DefaultQueryOptions(null, null, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, true, null, protocolVersion);
     }
 
     public static QueryOptions create(ConsistencyLevel consistency,
@@ -105,6 +109,7 @@ public abstract class QueryOptions
     {
         return new DefaultQueryOptions(consistency,
                                        values,
+                                       
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
                                        skipMetadata,
                                        new SpecificOptions(pageSize, 
pagingState, serialConsistency, timestamp, keyspace, nowInSeconds),
                                        version);
@@ -127,6 +132,34 @@ public abstract class QueryOptions
 
     public abstract ConsistencyLevel getConsistency();
     public abstract List<ByteBuffer> getValues();
+
+    public abstract int getValuesSize();
+
+    public ByteBuffer getValue(int index)
+    {
+        return getValues().get(index);
+    }
+
+    /**
+     * to check if it is possible to get values as byte[] instead of ByteBuffer
+     * the logic is added as a memory allocation optimization
+     * to avoid creating of HeapByteBuffer wrappers for some typical requests
+     * getValues method is still in use and must be supported even if true is 
returned
+     */
+    public boolean isByteArrayValuesGetSupported()
+    {
+        return false;
+    }
+
+    /**
+     * an allocation-optimized version of getValues() method
+     */
+    public byte[][] getByteArrayValues()
+    {
+        throw new IllegalStateException("getByteArrayValues() method is not 
implemented, " +
+                                        "isByteArrayValuesGetSupported() must 
be always checked before invoking this method");
+    }
+
     public abstract boolean skipMetadata();
 
     /**
@@ -338,7 +371,9 @@ public abstract class QueryOptions
     static class DefaultQueryOptions extends QueryOptions
     {
         private final ConsistencyLevel consistency;
-        private final List<ByteBuffer> values;
+        private List<ByteBuffer> values; // initialized on demand
+        private final byte[][] valuesAsByteArray;
+        private final boolean isByteArrayGetSupported;
         private final boolean skipMetadata;
 
         private final SpecificOptions options;
@@ -346,10 +381,14 @@ public abstract class QueryOptions
         private final transient ProtocolVersion protocolVersion;
         private final transient ReadThresholds readThresholds = 
ReadThresholds.create();
 
-        DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> 
values, boolean skipMetadata, SpecificOptions options, ProtocolVersion 
protocolVersion)
+        DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> 
values,
+                            byte[][] valuesAsByteArray, boolean skipMetadata,
+                            SpecificOptions options, ProtocolVersion 
protocolVersion)
         {
             this.consistency = consistency;
             this.values = values;
+            isByteArrayGetSupported = (values == null);
+            this.valuesAsByteArray = valuesAsByteArray;
             this.skipMetadata = skipMetadata;
             this.options = options;
             this.protocolVersion = protocolVersion;
@@ -362,9 +401,38 @@ public abstract class QueryOptions
 
         public List<ByteBuffer> getValues()
         {
+            if (values == null)
+            {
+                values = new ArrayList<>(valuesAsByteArray.length);
+                for (byte[] byteArrayValue : valuesAsByteArray)
+                    values.add(convertToByteBufferValue(byteArrayValue));
+            }
             return values;
         }
 
+        public int getValuesSize()
+        {
+            return isByteArrayValuesGetSupported() ? valuesAsByteArray.length 
: values.size();
+        }
+
+        public ByteBuffer getValue(int index)
+        {
+            if (values != null)
+                return values.get(index);
+
+            return convertToByteBufferValue(valuesAsByteArray[index]);
+        }
+
+        public boolean isByteArrayValuesGetSupported()
+        {
+            return isByteArrayGetSupported;
+        }
+
+        public byte[][] getByteArrayValues()
+        {
+            return valuesAsByteArray;
+        }
+
         public boolean skipMetadata()
         {
             return skipMetadata;
@@ -406,6 +474,26 @@ public abstract class QueryOptions
             return wrapped.getConsistency();
         }
 
+        public int getValuesSize()
+        {
+            return wrapped.getValuesSize();
+        }
+
+        public ByteBuffer getValue(int index)
+        {
+            return this.wrapped.getValue(index);
+        }
+
+        public boolean isByteArrayValuesGetSupported()
+        {
+            return wrapped.isByteArrayValuesGetSupported();
+        }
+
+        public byte[][] getByteArrayValues()
+        {
+            return wrapped.getByteArrayValues();
+        }
+
         public boolean skipMetadata()
         {
             return wrapped.skipMetadata();
@@ -625,19 +713,19 @@ public abstract class QueryOptions
                         ? (int)body.readUnsignedInt()
                         : (int)body.readUnsignedByte();
 
-            List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+            byte[][] values = ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS;
             List<String> names = null;
             if (Flag.contains(flags, Flag.VALUES))
             {
                 if (Flag.contains(flags, Flag.NAMES_FOR_VALUES))
                 {
-                    Pair<List<String>, List<ByteBuffer>> namesAndValues = 
CBUtil.readNameAndValueList(body, version);
+                    Pair<List<String>, byte[][]> namesAndValues = 
CBUtil.readNameAndValueListAsByteArrays(body, version);
                     names = namesAndValues.left;
                     values = namesAndValues.right;
                 }
                 else
                 {
-                    values = CBUtil.readValueList(body, version);
+                    values = CBUtil.readValueListAsByteArrays(body, version);
                 }
             }
 
@@ -665,7 +753,7 @@ public abstract class QueryOptions
                 options = new SpecificOptions(pageSize, pagingState, 
serialConsistency, timestamp, keyspace, nowInSeconds);
             }
 
-            DefaultQueryOptions opts = new DefaultQueryOptions(consistency, 
values, skipMetadata, options, version);
+            DefaultQueryOptions opts = new DefaultQueryOptions(consistency, 
null, values, skipMetadata, options, version);
             return names == null ? opts : new OptionsWithNames(opts, names);
         }
 
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 7cb1d668e0..56fca66220 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -914,19 +914,19 @@ public class QueryProcessor implements QueryHandler
     public ResultMessage processPrepared(CQLStatement statement, QueryState 
queryState, QueryOptions options, Dispatcher.RequestTime requestTime)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = options.getValues();
+        int variablesSize = options.getValuesSize();
         // Check to see if there are any bound variables to verify
-        if (!(variables.isEmpty() && statement.getBindVariables().isEmpty()))
+        if (!(variablesSize == 0 && statement.getBindVariables().isEmpty()))
         {
-            if (variables.size() != statement.getBindVariables().size())
+            if (variablesSize != statement.getBindVariables().size())
                 throw new InvalidRequestException(String.format("there were %d 
markers(?) in CQL but %d bound variables",
                                                                 
statement.getBindVariables().size(),
-                                                                
variables.size()));
+                                                                
variablesSize));
 
             // at this point there is a match in count between markers and 
variables that is non-zero
             if (logger.isTraceEnabled())
-                for (int i = 0; i < variables.size(); i++)
-                    logger.trace("[{}] '{}'", i+1, variables.get(i));
+                for (int i = 0; i < variablesSize; i++)
+                    logger.trace("[{}] '{}'", i+1, options.getValues().get(i));
         }
 
         metrics.preparedStatementsExecuted.inc();
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 85a97ccc3d..a0a51a3829 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Slice;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.rows.ArrayCell;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
@@ -165,16 +166,14 @@ public class UpdateParameters
         return addCell(column, null, value);
     }
 
-    public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer 
value) throws InvalidRequestException
+    public Cell<?> addCell(ColumnMetadata column, byte[] value) throws 
InvalidRequestException
     {
-        // General column value size
-        Guardrails.columnValueSize.guard(value.remaining(), 
column.name.toString(), false, clientState);
-
-        // Check specific sizes per column type
-        validateColumnSize(column, value);
+        return addCell(column, null, value);
+    }
 
-        if (path != null && column.type.isMultiCell())
-            Guardrails.columnValueSize.guard(path.dataSize(), 
column.name.toString(), false, clientState);
+    public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer 
value) throws InvalidRequestException
+    {
+        validateCell(column, path, value.remaining());
 
         Cell<?> cell = ttl == LivenessInfo.NO_TTL
                        ? BufferCell.live(column, timestamp, value, path)
@@ -183,6 +182,29 @@ public class UpdateParameters
         return cell;
     }
 
+    public Cell<?> addCell(ColumnMetadata column, CellPath path, byte[] value) 
throws InvalidRequestException
+    {
+        validateCell(column, path, value.length);
+
+        Cell<?> cell = ttl == LivenessInfo.NO_TTL
+                       ? ArrayCell.live(column, timestamp, value, path)
+                       : ArrayCell.expiring(column, timestamp, ttl, nowInSec, 
value, path);
+        builder.addCell(cell);
+        return cell;
+    }
+
+    private void validateCell(ColumnMetadata column, CellPath path, int 
valueSize)
+    {
+        // General column value size
+        Guardrails.columnValueSize.guard(valueSize, column.name.toString(), 
false, clientState);
+
+        // Check specific sizes per column type
+        validateColumnSize(column, valueSize);
+
+        if (path != null && column.type.isMultiCell())
+            Guardrails.columnValueSize.guard(path.dataSize(), 
column.name.toString(), false, clientState);
+    }
+
     public void addRow(Row row)
     {
         newRow(row.clustering());
@@ -206,20 +228,20 @@ public class UpdateParameters
         });
     }
 
-    private void validateColumnSize(ColumnMetadata column, ByteBuffer value)
+    private void validateColumnSize(ColumnMetadata column, int valueSize)
     {
         CQL3Type cql3Type = column.type.asCQL3Type();
         if (cql3Type.equals(CQL3Type.Native.ASCII)) // Ascii size specific 
guardrail
         {
-            Guardrails.columnAsciiValueSize.guard(value.remaining(), 
column.name.toString(), false, clientState);
+            Guardrails.columnAsciiValueSize.guard(valueSize, 
column.name.toString(), false, clientState);
         }
         else if (cql3Type.equals(CQL3Type.Native.BLOB)) // Blob size specific 
guardrail
         {
-            Guardrails.columnBlobValueSize.guard(value.remaining(), 
column.name.toString(), false, clientState);
+            Guardrails.columnBlobValueSize.guard(valueSize, 
column.name.toString(), false, clientState);
         }
         else if (cql3Type.equals(CQL3Type.Native.TEXT)) // text and varchar 
size specific guardrails
         {
-            Guardrails.columnTextAndVarcharValueSize.guard(value.remaining(), 
column.name.toString(), false, clientState);
+            Guardrails.columnTextAndVarcharValueSize.guard(valueSize, 
column.name.toString(), false, clientState);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/terms/Constants.java 
b/src/java/org/apache/cassandra/cql3/terms/Constants.java
index a912f5556d..49d9592567 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Constants.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FastByteOperations;
 
@@ -479,11 +480,22 @@ public abstract class Constants
 
         public void execute(DecoratedKey partitionKey, UpdateParameters 
params) throws InvalidRequestException
         {
-            ByteBuffer value = t.bindAndGet(params.options);
-            if (value == null)
-                params.addTombstone(column);
-            else if (value != ByteBufferUtil.UNSET_BYTE_BUFFER) // use 
reference equality and not object equality
-                params.addCell(column, value);
+            if (t.isByteArrayGetSupported(params.options))
+            {
+                byte[] value = t.bindAndGetByteArray(params.options);
+                if (value == null)
+                    params.addTombstone(column);
+                else if (value != ByteArrayUtil.UNSET_BYTE_ARRAY) // use 
reference equality and not object equality
+                    params.addCell(column, value);
+            }
+            else
+            {
+                ByteBuffer value = t.bindAndGet(params.options);
+                if (value == null)
+                    params.addTombstone(column);
+                else if (value != ByteBufferUtil.UNSET_BYTE_BUFFER) // use 
reference equality and not object equality
+                    params.addCell(column, value);
+            }
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/terms/Marker.java 
b/src/java/org/apache/cassandra/cql3/terms/Marker.java
index 48fa02e9f4..26ff845987 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Marker.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Marker.java
@@ -26,9 +26,11 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.MultiElementType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -74,7 +76,7 @@ public final class Marker extends Term.NonTerminal
     {
         try
         {
-            ByteBuffer bytes = options.getValues().get(bindIndex);
+            ByteBuffer bytes = options.getValue(bindIndex);
             if (bytes == null)
                 return null;
 
@@ -93,6 +95,40 @@ public final class Marker extends Term.NonTerminal
         }
     }
 
+    public boolean isByteArrayGetSupported(QueryOptions options)
+    {
+        return options.isByteArrayValuesGetSupported();
+    }
+
+    /*
+       Same logic as bind() but it returns byte[] instead of ByteBuffer and 
there is no Value wrapper usage
+     */
+    @Override
+    public byte[] bindAndGetByteArray(QueryOptions options) throws 
InvalidRequestException
+    {
+        try
+        {
+            byte[] bytes = options.getByteArrayValues()[bindIndex];
+            if (bytes == null)
+                return null;
+
+            if (bytes == ByteArrayUtil.UNSET_BYTE_ARRAY)
+                return ByteArrayUtil.UNSET_BYTE_ARRAY;
+
+            if (receiver.type instanceof MultiElementType<?>)
+            {
+                MultiElementType<?> type = (MultiElementType<?>) receiver.type;
+                return 
type.pack(type.filterSortAndValidateElementsFromArrays(type.unpack(bytes)), 
ByteArrayAccessor.instance);
+            }
+            receiver.type.validate(bytes, ByteArrayAccessor.instance);
+            return bytes;
+        }
+        catch (MarshalException e)
+        {
+            throw new InvalidRequestException(e.getMessage());
+        }
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/cql3/terms/Term.java 
b/src/java/org/apache/cassandra/cql3/terms/Term.java
index b7babb5e5e..0fc2673922 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Term.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Term.java
@@ -89,6 +89,24 @@ public interface Term
      */
     ByteBuffer bindAndGet(QueryOptions options);
 
+    /**
+     * return true if an optimized bindAndGetByteArray() method is implemented 
and can be used for this type of Term
+     * to retrieve a value as byte[] instead of default ByteBuffer provided by 
bindAndGet()
+     */
+    default boolean isByteArrayGetSupported(QueryOptions options)
+    {
+        return false;
+    }
+
+    /**
+     * an allocation-optimized version of bindAndGet() method
+     */
+    default byte[] bindAndGetByteArray(QueryOptions options) throws 
InvalidRequestException
+    {
+        throw new IllegalStateException("bindAndGetByteArray() method is not 
implemented, " +
+                                        "isByteArrayGetSupported() must be 
always checked before invoking this method");
+    }
+
     /**
      * A shorter for {@code bind(options).getElements()}.
      * We expose it mainly because for constants it can avoid allocating a 
temporary
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java 
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 6c391b050e..cb40b1ae2d 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -280,6 +280,18 @@ public class ListType<T> extends CollectionType<List<T>>
         return buffers;
     }
 
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        for (byte[] buffer: buffers)
+        {
+            if (buffer == null)
+                throw new MarshalException("null is not supported inside 
collections");
+            elements.validate(buffer, ByteArrayAccessor.instance);
+        }
+        return buffers;
+    }
+
     @Override
     protected int compareNextCell(Iterator<Cell<?>> cellIterator, 
Iterator<ByteBuffer> elementIter)
     {
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java 
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 8ed43c3c8e..8d206c1572 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -383,26 +384,37 @@ public class MapType<K, V> extends CollectionType<Map<K, 
V>>
 
     @Override
     public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteBufferAccessor.instance, getKeysType().comparatorSet.buffer);
+    }
+
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteArrayAccessor.instance, getKeysType().comparatorSet.array);
+    }
+
+    private <T> List<T> filterSortAndValidateElements(List<T> buffers, 
ValueAccessor<T> valueAccessor, Comparator<T> comparator)
     {
         // We depend on Maps to be properly sorted by their keys, so use a 
sorted map implementation here.
-        SortedMap<ByteBuffer, ByteBuffer> map = new TreeMap<>(getKeysType());
-        Iterator<ByteBuffer> iter = buffers.iterator();
+        SortedMap<T, T> map = new TreeMap<>(comparator);
+        Iterator<T> iter = buffers.iterator();
         while (iter.hasNext())
         {
-            ByteBuffer keyBytes = iter.next();
-            ByteBuffer valueBytes = iter.next();
+            T keyBytes = iter.next();
+            T valueBytes = iter.next();
 
             if (keyBytes == null || valueBytes == null)
                 throw new MarshalException("null is not supported inside 
collections");
 
-            getKeysType().validate(keyBytes);
-            getValuesType().validate(valueBytes);
+            getKeysType().validate(keyBytes, valueAccessor);
+            getValuesType().validate(valueBytes, valueAccessor);
 
             map.put(keyBytes, valueBytes);
         }
 
-        List<ByteBuffer> sortedBuffers = new ArrayList<>(map.size() << 1);
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
+        List<T> sortedBuffers = new ArrayList<>(map.size() << 1);
+        for (Map.Entry<T, T> entry : map.entrySet())
         {
             sortedBuffers.add(entry.getKey());
             sortedBuffers.add(entry.getValue());
diff --git a/src/java/org/apache/cassandra/db/marshal/MultiElementType.java 
b/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
index 99ce762d6a..dc8f912e09 100644
--- a/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
@@ -99,6 +99,8 @@ public abstract class MultiElementType<T> extends 
AbstractType<T>
      */
     public abstract List<ByteBuffer> 
filterSortAndValidateElements(List<ByteBuffer> buffers);
 
+    public abstract List<byte[]> 
filterSortAndValidateElementsFromArrays(List<byte[]> buffers);
+
     /**
      * Compares the multicell value represensted by the column data with the 
specified elements.
      * @param columnData the column data representing the multicell value
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java 
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index f2568e3cbc..8a8ae45e04 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -255,12 +256,23 @@ public class SetType<T> extends CollectionType<Set<T>>
     @Override
     public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
     {
-        SortedSet<ByteBuffer> sorted = new TreeSet<>(elements);
-        for (ByteBuffer buffer: buffers)
+        return filterSortAndValidateElements(buffers, 
ByteBufferAccessor.instance, elements.comparatorSet.buffer);
+    }
+
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteArrayAccessor.instance, elements.comparatorSet.array);
+    }
+
+    private <V> List<V> filterSortAndValidateElements(List<V> buffers, 
ValueAccessor<V> valueAccessor, Comparator<V> comparator)
+    {
+        SortedSet<V> sorted = new TreeSet<>(comparator);
+        for (V buffer: buffers)
         {
             if (buffer == null)
                 throw new MarshalException("null is not supported inside 
collections");
-            elements.validate(buffer);
+            elements.validate(buffer, valueAccessor);
             sorted.add(buffer);
         }
         return new ArrayList<>(sorted);
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java 
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index 47301b6e97..8004507762 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -40,6 +40,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.*;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JsonUtils;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
@@ -388,6 +389,17 @@ public class TupleType extends MultiElementType<ByteBuffer>
 
     @Override
     public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+    }
+
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+    }
+
+    private <T> List<T> filterSortAndValidateElements(List<T> buffers, T 
unsetValue, ValueAccessor<T> valueAccessor)
     {
         if (buffers.size() > size())
             throw new MarshalException(String.format("Tuple value contains too 
many fields (expected %s, got %s)", size(), buffers.size()));
@@ -395,12 +407,12 @@ public class TupleType extends 
MultiElementType<ByteBuffer>
         for (int i = 0; i < buffers.size(); i++)
         {
             // Since A tuple value is always written in its entirety Cassandra 
can't preserve a pre-existing value by 'not setting' the new value. Reject the 
query.
-            ByteBuffer buffer = buffers.get(i);
+            T buffer = buffers.get(i);
             if (buffer == null)
                 continue;
-            if (buffer == ByteBufferUtil.UNSET_BYTE_BUFFER)
+            if (buffer == unsetValue)
                 throw new InvalidRequestException(String.format("Invalid unset 
value for tuple field number %d", i));
-            type(i).validate(buffer);
+            type(i).validate(buffer, valueAccessor);
         }
 
         return buffers;
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java 
b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 23b172c816..1f0f703070 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.UserTypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JsonUtils;
 import org.apache.cassandra.utils.Pair;
@@ -644,6 +645,17 @@ public class UserType extends TupleType implements 
SchemaElement
 
     @Override
     public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+    }
+
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+    }
+
+    private <T> List<T> filterSortAndValidateElements(List<T> buffers, T 
unsetValue, ValueAccessor<T> valueAccessor)
     {
         if (buffers.size() > size())
             throw new MarshalException(String.format("UDT value contained too 
many fields (expected %s, got %s)", size(), buffers.size()));
@@ -652,12 +664,12 @@ public class UserType extends TupleType implements 
SchemaElement
         {
             // Since a frozen UDT value is always written in its entirety 
Cassandra can't preserve a pre-existing
             // value by 'not setting' the new value. Reject the query.
-            ByteBuffer buffer = buffers.get(i);
+            T buffer = buffers.get(i);
             if (buffer == null)
                 continue;
-            if (!isMultiCell() && buffer == ByteBufferUtil.UNSET_BYTE_BUFFER)
+            if (!isMultiCell() && buffer == unsetValue)
                 throw new MarshalException(String.format("Invalid unset value 
for field '%s' of user defined type %s", fieldNameAsString(i), 
getNameAsString()));
-            type(i).validate(buffer);
+            type(i).validate(buffer, valueAccessor);
         }
 
         return buffers;
diff --git a/src/java/org/apache/cassandra/db/marshal/VectorType.java 
b/src/java/org/apache/cassandra/db/marshal/VectorType.java
index e70857c5aa..3cdf72270e 100644
--- a/src/java/org/apache/cassandra/db/marshal/VectorType.java
+++ b/src/java/org/apache/cassandra/db/marshal/VectorType.java
@@ -37,6 +37,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JsonUtils;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
@@ -194,20 +195,31 @@ public final class VectorType<T> extends 
MultiElementType<List<T>>
 
     @Override
     public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+    }
+
+    @Override
+    public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]> 
buffers)
+    {
+        return filterSortAndValidateElements(buffers, 
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+    }
+
+    public <V> List<V> filterSortAndValidateElements(List<V> buffers, V 
unsetValue, ValueAccessor<V> valueAccessor)
     {
         // We only filter and validate for this type.
         if (buffers == null)
             return null;
 
-        for (ByteBuffer buffer: buffers)
+        for (V buffer : buffers)
         {
-            if (buffer == null || elementType.isNull(buffer))
+            if (buffer == null || elementType.isNull(buffer, valueAccessor))
                 throw new MarshalException("null is not supported inside 
vectors");
 
-            if (buffer == ByteBufferUtil.UNSET_BYTE_BUFFER )
+            if (buffer == unsetValue)
                 throw new InvalidRequestException("unset is not supported 
inside vectors");
 
-            elementType.validate(buffer);
+            elementType.validate(buffer, valueAccessor);
         }
         return buffers;
     }
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java 
b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
index 8201b6849f..77d44c3eb7 100644
--- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
+++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.ExpirationDateOverflowHandling;
 import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.db.marshal.ValueAccessor;
@@ -59,6 +60,17 @@ public class ArrayCell extends AbstractCell<byte[]>
         this.path = path;
     }
 
+    public static ArrayCell live(ColumnMetadata column, long timestamp, byte[] 
value, CellPath path)
+    {
+        return new ArrayCell(column, timestamp, NO_TTL, NO_DELETION_TIME, 
value, path);
+    }
+
+    public static ArrayCell expiring(ColumnMetadata column, long timestamp, 
int ttl, long nowInSec, byte[] value, CellPath path)
+    {
+        assert ttl != NO_TTL;
+        return new ArrayCell(column, timestamp, ttl, 
ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl), 
value, path);
+    }
+
     public long timestamp()
     {
         return timestamp;
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index ced2f5229a..d3d0eb0416 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -120,6 +120,11 @@ public abstract class Cell<V> extends ColumnData
         return accessor().toBuffer(value());
     }
 
+    public byte[] valueAsArray()
+    {
+        return accessor().toArray(value());
+    }
+
     /**
      * The cell timestamp.
      * <p>
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java 
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index ebea8a319b..56301cfa7a 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import org.apache.cassandra.db.marshal.AddressBasedNativeData;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.NativeAccessor;
 import org.apache.cassandra.db.marshal.NativeData;
 import org.apache.cassandra.db.marshal.ValueAccessor;
@@ -66,18 +67,37 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
         this.peer = 0;
     }
 
-    public NativeCell(AddressBasedAllocator allocator,
-                      OpOrder.Group writeOp,
-                      Cell<?> cell)
+    public static NativeCell build(AddressBasedAllocator allocator,
+                                   OpOrder.Group writeOp,
+                                   Cell<?> cell)
     {
-        this(allocator,
-             writeOp,
-             cell.column(),
-             cell.timestamp(),
-             cell.ttl(),
-             cell.localDeletionTimeAsUnsignedInt(),
-             cell.buffer(),
-             cell.path());
+        if (cell.accessor() == ByteArrayAccessor.instance) // to avoid 
ByteBuffer allocation via cell.value()
+        {
+            byte[] value = cell.valueAsArray();
+            return new NativeCell(allocator,
+                                  writeOp,
+                                  cell.column(),
+                                  cell.timestamp(),
+                                  cell.ttl(),
+                                  cell.localDeletionTimeAsUnsignedInt(),
+                                  value,
+                                  value.length,
+                                  cell.path());
+        }
+        else
+        {
+            ByteBuffer byteBuffer = cell.buffer();
+            assert byteBuffer.order() == ByteOrder.BIG_ENDIAN;
+            return new NativeCell(allocator,
+                                  writeOp,
+                                  cell.column(),
+                                  cell.timestamp(),
+                                  cell.ttl(),
+                                  cell.localDeletionTimeAsUnsignedInt(),
+                                  byteBuffer,
+                                  byteBuffer.remaining(),
+                                  cell.path());
+        }
     }
 
     // Please keep both int/long overloaded ctros public. Otherwise silent 
casts will mess timestamps when one is not
@@ -91,7 +111,7 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
                       ByteBuffer value,
                       CellPath path)
     {
-        this(allocator, writeOp, column, timestamp, ttl, 
deletionTimeLongToUnsignedInteger(localDeletionTime), value, path);
+        this(allocator, writeOp, column, timestamp, ttl, 
deletionTimeLongToUnsignedInteger(localDeletionTime), value, value.remaining(), 
path);
     }
 
     public NativeCell(AddressBasedAllocator allocator,
@@ -100,13 +120,13 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
                       long timestamp,
                       int ttl,
                       int localDeletionTimeUnsignedInteger,
-                      ByteBuffer value,
+                      Object value,
+                      int valueLength,
                       CellPath path)
     {
         super(column);
-        long size = offHeapSizeWithoutPath(value.remaining());
+        long size = offHeapSizeWithoutPath(valueLength);
 
-        assert value.order() == ByteOrder.BIG_ENDIAN;
         assert column.isComplex() == (path != null);
         if (path != null)
         {
@@ -123,15 +143,20 @@ public class NativeCell extends AbstractCell<NativeData> 
implements NativeData
         NativeEndianMemoryUtil.setLong(peer + TIMESTAMP, timestamp);
         NativeEndianMemoryUtil.setInt(peer + TTL, ttl);
         NativeEndianMemoryUtil.setInt(peer + DELETION, 
localDeletionTimeUnsignedInteger);
-        NativeEndianMemoryUtil.setInt(peer + LENGTH, value.remaining());
-        MemoryUtil.setBytes(peer + VALUE, value);
+        NativeEndianMemoryUtil.setInt(peer + LENGTH, valueLength);
+        if (value instanceof byte[])
+            MemoryUtil.setBytes(peer + VALUE, (byte[]) value, 0, valueLength);
+        else if (value instanceof ByteBuffer)
+            MemoryUtil.setBytes(peer + VALUE, (ByteBuffer) value);
+        else
+            throw new IllegalArgumentException();
 
         if (path != null)
         {
             ByteBuffer pathbuffer = path.get(0);
             assert pathbuffer.order() == ByteOrder.BIG_ENDIAN;
 
-            long offset = peer + VALUE + value.remaining();
+            long offset = peer + VALUE + valueLength;
             NativeEndianMemoryUtil.setInt(offset, pathbuffer.remaining());
             MemoryUtil.setBytes(offset + 4, pathbuffer);
         }
diff --git a/src/java/org/apache/cassandra/fql/FullQueryLogger.java 
b/src/java/org/apache/cassandra/fql/FullQueryLogger.java
index abbfa558fe..a9697ba49f 100644
--- a/src/java/org/apache/cassandra/fql/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/fql/FullQueryLogger.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.fql;
 
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -268,7 +267,7 @@ public class FullQueryLogger implements QueryEvents.Listener
     public void batchSuccess(BatchStatement.Type type,
                              List<? extends CQLStatement> statements,
                              List<String> queries,
-                             List<List<ByteBuffer>> values,
+                             List<byte[][]> values,
                              QueryOptions queryOptions,
                              QueryState queryState,
                              long batchTimeMillis,
@@ -383,11 +382,11 @@ public class FullQueryLogger implements 
QueryEvents.Listener
         private final int weight;
         private final BatchStatement.Type batchType;
         private final List<String> queries;
-        private final List<List<ByteBuffer>> values;
+        private final List<byte[][]> values;
 
         public Batch(BatchStatement.Type batchType,
                      List<String> queries,
-                     List<List<ByteBuffer>> values,
+                     List<byte[][]> values,
                      QueryOptions queryOptions,
                      QueryState queryState,
                      long batchTimeMillis)
@@ -406,11 +405,11 @@ public class FullQueryLogger implements 
QueryEvents.Listener
                 queriesSize += ObjectSizes.sizeOf(checkNotNull(query));
 
             long valuesSize = EMPTY_LIST_SIZE + 
ObjectSizes.sizeOfReferenceArray(values.size());
-            for (List<ByteBuffer> subValues : values)
+            for (byte[][] subValues : values)
             {
-                valuesSize += EMPTY_LIST_SIZE + 
ObjectSizes.sizeOfReferenceArray(subValues.size());
-                for (ByteBuffer subValue : subValues)
-                    valuesSize += ObjectSizes.sizeOnHeapOf(subValue);
+                valuesSize += EMPTY_LIST_SIZE + 
ObjectSizes.sizeOfReferenceArray(subValues.length);
+                for (byte[] subValue : subValues)
+                    valuesSize += ObjectSizes.sizeOfArray(subValue);
             }
 
             // No need to add the batch type which is an enum.
@@ -450,10 +449,10 @@ public class FullQueryLogger implements 
QueryEvents.Listener
             }
             valueOut = wire.write(VALUES);
             valueOut.int32(values.size());
-            for (List<ByteBuffer> subValues : values)
+            for (byte[][] subValues : values)
             {
-                valueOut.int32(subValues.size());
-                for (ByteBuffer value : subValues)
+                valueOut.int32(subValues.length);
+                for (byte[] value : subValues)
                     valueOut.bytes(value == null ? null : 
BytesStore.wrap(value));
             }
         }
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java 
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 1c73206c67..3c6a3a9011 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -42,6 +42,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.LazyToString;
 import org.apache.cassandra.utils.Pair;
@@ -568,6 +569,35 @@ public abstract class CBUtil
         return l;
     }
 
+    public static byte[][] readValueListAsByteArrays(ByteBuf cb, 
ProtocolVersion protocolVersion)
+    {
+        int size = cb.readUnsignedShort();
+        if (size == 0)
+            return ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS;
+
+        byte[][] l = new byte[size][];
+        for (int i = 0; i < size; i++)
+            l[i] = readBoundValueAsByteArray(cb, protocolVersion);
+        return l;
+    }
+
+    public static byte[] readBoundValueAsByteArray(ByteBuf cb, ProtocolVersion 
protocolVersion)
+    {
+        int length = cb.readInt();
+        if (length < 0)
+        {
+            if (protocolVersion.isSmallerThan(ProtocolVersion.V4)) // backward 
compatibility for pre-version 4
+                return null;
+            if (length == -1)
+                return null;
+            else if (length == -2)
+                return ByteArrayUtil.UNSET_BYTE_ARRAY;
+            else
+                throw new ProtocolException("Invalid ByteBuf length " + 
length);
+        }
+        return readRawBytes(cb, length);
+    }
+
     public static void writeValueList(List<ByteBuffer> values, ByteBuf cb)
     {
         cb.writeShort(values.size());
@@ -575,6 +605,13 @@ public abstract class CBUtil
             CBUtil.writeValue(value, cb);
     }
 
+    public static void writeValueListOfByteArrays(byte[][] values, ByteBuf cb)
+    {
+        cb.writeShort(values.length);
+        for (byte[] value : values)
+            CBUtil.writeValue(value, cb);
+    }
+
     public static int sizeOfValueList(List<ByteBuffer> values)
     {
         int size = 2;
@@ -583,6 +620,14 @@ public abstract class CBUtil
         return size;
     }
 
+    public static int sizeOfValueListOfByteArrays(byte[][] values)
+    {
+        int size = 2;
+        for (byte[] value : values)
+            size += CBUtil.sizeOfValue(value);
+        return size;
+    }
+
     public static Pair<List<String>, List<ByteBuffer>> 
readNameAndValueList(ByteBuf cb, ProtocolVersion protocolVersion)
     {
         int size = cb.readUnsignedShort();
@@ -599,6 +644,22 @@ public abstract class CBUtil
         return Pair.create(s, l);
     }
 
+    public static Pair<List<String>, byte[][]> 
readNameAndValueListAsByteArrays(ByteBuf cb, ProtocolVersion protocolVersion)
+    {
+        int size = cb.readUnsignedShort();
+        if (size == 0)
+            return Pair.create(Collections.<String>emptyList(), 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
+
+        List<String> s = new ArrayList<>(size);
+        byte[][] l = new byte[size][];
+        for (int i = 0; i < size; i++)
+        {
+            s.add(readString(cb));
+            l[i] = readBoundValueAsByteArray(cb, protocolVersion);
+        }
+        return Pair.create(s, l);
+    }
+
     public static InetSocketAddress readInet(ByteBuf cb)
     {
         int addrSize = cb.readByte() & 0xFF;
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java 
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index d45105f109..5c0e09be42 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.transport.messages;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -58,7 +57,7 @@ public class BatchMessage extends Message.Request
             byte type = body.readByte();
             int n = body.readUnsignedShort();
             List<Object> queryOrIds = new ArrayList<>(n);
-            List<List<ByteBuffer>> variables = new ArrayList<>(n);
+            List<byte[][]> variables = new ArrayList<>(n);
             for (int i = 0; i < n; i++)
             {
                 byte kind = body.readByte();
@@ -68,7 +67,7 @@ public class BatchMessage extends Message.Request
                     queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
                 else
                     throw new ProtocolException("Invalid query kind in BATCH 
messages. Must be 0 or 1 but got " + kind);
-                variables.add(CBUtil.readValueList(body, version));
+                variables.add(CBUtil.readValueListAsByteArrays(body, version));
             }
             QueryOptions options = QueryOptions.codec.decode(body, version);
 
@@ -91,7 +90,7 @@ public class BatchMessage extends Message.Request
                 else
                     CBUtil.writeBytes(((MD5Digest)q).bytes, dest);
 
-                CBUtil.writeValueList(msg.values.get(i), dest);
+                CBUtil.writeValueListOfByteArrays(msg.values.get(i), dest);
             }
 
             if (version.isSmallerThan(ProtocolVersion.V3))
@@ -110,7 +109,7 @@ public class BatchMessage extends Message.Request
                              ? CBUtil.sizeOfLongString((String)q)
                              : CBUtil.sizeOfBytes(((MD5Digest)q).bytes));
 
-                size += CBUtil.sizeOfValueList(msg.values.get(i));
+                size += CBUtil.sizeOfValueListOfByteArrays(msg.values.get(i));
             }
             size += version.isSmallerThan(ProtocolVersion.V3)
                   ? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency())
@@ -145,10 +144,10 @@ public class BatchMessage extends Message.Request
 
     public final BatchStatement.Type batchType;
     public final List<Object> queryOrIdList;
-    public final List<List<ByteBuffer>> values;
+    public final List<byte[][]> values;
     public final QueryOptions options;
 
-    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, 
List<List<ByteBuffer>> values, QueryOptions options)
+    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, 
List<byte[][]> values, QueryOptions options)
     {
         super(Message.Type.BATCH);
         this.batchType = type;
@@ -197,11 +196,11 @@ public class BatchMessage extends Message.Request
                         throw new 
PreparedQueryNotFoundException((MD5Digest)query);
                 }
 
-                List<ByteBuffer> queryValues = values.get(i);
-                if (queryValues.size() != 
p.statement.getBindVariables().size())
+                byte[][] queryValues = values.get(i);
+                if (queryValues.length != 
p.statement.getBindVariables().size())
                     throw new InvalidRequestException(String.format("There 
were %d markers(?) in CQL but %d bound variables",
                                                                     
p.statement.getBindVariables().size(),
-                                                                    
queryValues.size()));
+                                                                    
queryValues.length));
 
                 prepared.add(p);
             }
@@ -260,7 +259,7 @@ public class BatchMessage extends Message.Request
         for (int i = 0; i < queryOrIdList.size(); i++)
         {
             if (i > 0) sb.append(", ");
-            sb.append(queryOrIdList.get(i)).append(" with 
").append(values.get(i).size()).append(" values");
+            sb.append(queryOrIdList.get(i)).append(" with 
").append(values.get(i).length).append(" values");
         }
         sb.append("] at consistency ").append(options.getConsistency());
         return sb.toString();
diff --git a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java 
b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
index f0e797c105..13fb44c3bb 100644
--- a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 public class ByteArrayUtil
 {
     public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+    public static final byte[][] EMPTY_ARRAY_OF_BYTE_ARRAYS = new byte[0][];
+    public static final byte[] UNSET_BYTE_ARRAY = new byte[0];
 
     public static int compareUnsigned(byte[] o1, byte[] o2)
     {
@@ -277,4 +279,16 @@ public class ByteArrayUtil
     {
         FastByteOperations.copy(src, srcPos, dst, dstPos, length);
     }
+
+
+    public static ByteBuffer convertToByteBufferValue(byte[] arrayValue)
+    {
+        if (arrayValue == null)
+            return null;
+        if (arrayValue == ByteArrayUtil.UNSET_BYTE_ARRAY)
+            return ByteBufferUtil.UNSET_BYTE_BUFFER; // it is important for 
correctness to preserve unset value
+        if (arrayValue == ByteArrayUtil.EMPTY_BYTE_ARRAY)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        return ByteBuffer.wrap(arrayValue);
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java 
b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 22ee46c68c..cc30e7c4b7 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -87,7 +87,7 @@ public class NativeAllocator extends MemtableAllocator 
implements AddressBasedAl
         @Override
         public void addCell(Cell<?> cell)
         {
-            super.addCell(new NativeCell(allocator, writeOp, cell));
+            super.addCell(NativeCell.build(allocator, writeOp, cell));
         }
     }
 
@@ -153,7 +153,7 @@ public class NativeAllocator extends MemtableAllocator 
implements AddressBasedAl
         @Override
         public Cell<?> clone(Cell<?> cell)
         {
-            return new NativeCell(this, opGroup, cell);
+            return NativeCell.build(this, opGroup, cell);
         }
 
         @Override
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
index dd3e1a1d34..98abda9cda 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -18,14 +18,11 @@
 
 package org.apache.cassandra.test.microbench;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Attributes;
 import org.apache.cassandra.cql3.BatchQueryOptions;
@@ -44,6 +41,7 @@ import org.apache.cassandra.schema.SchemaTestUtil;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -64,8 +62,6 @@ import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
 @BenchmarkMode(Mode.Throughput)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@@ -108,14 +104,14 @@ public class BatchStatementBench
         
SchemaTestUtil.addOrUpdateKeyspace(ksm.withSwapped(ksm.tables.with(metadata)), 
false);
 
         List<ModificationStatement> modifications = new ArrayList<>(batchSize);
-        List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize);
+        List<byte[][]> parameters = new ArrayList<>(batchSize);
         List<Object> queryOrIdList = new ArrayList<>(batchSize);
         QueryHandler.Prepared prepared = 
QueryProcessor.prepareInternal(String.format("INSERT INTO %s.%s (id, ck, v) 
VALUES (?,?,?)", keyspace, table));
 
         for (int i = 0; i < batchSize; i++)
         {
             modifications.add((ModificationStatement) prepared.statement);
-            parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1), 
bytes(i), bytes(i)));
+            parameters.add(new byte[][] {ByteArrayUtil.bytes(uniquePartition ? 
i : 1), ByteArrayUtil.bytes(i), ByteArrayUtil.bytes(i)});
             queryOrIdList.add(prepared.rawCQLStatement);
         }
         bs = new BatchStatement(BatchStatement.Type.UNLOGGED, 
VariableSpecifications.empty(), modifications, Attributes.none());
diff --git a/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java 
b/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
index dd1cb45760..1b97d1eb71 100644
--- a/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.cql3;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -39,8 +38,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ByteArrayUtil;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -144,9 +144,9 @@ public class QueryEventsTest extends CQLTester
         assertEquals(3, listener.queries.size());
         assertEquals(BatchStatement.Type.UNLOGGED, listener.batchType);
         assertEquals(newArrayList(q1, q1, q2), listener.queries);
-        assertEquals(newArrayList(newArrayList(ByteBufferUtil.bytes(1), 
ByteBufferUtil.bytes(1)),
-                                  newArrayList(ByteBufferUtil.bytes(2), 
ByteBufferUtil.bytes(2)),
-                                  newArrayList(newArrayList())), 
listener.values);
+        assertListOfArraysEquals(newArrayList(new byte[][] 
{ByteArrayUtil.bytes(1), ByteArrayUtil.bytes(1)},
+                                              new byte[][] 
{ByteArrayUtil.bytes(2), ByteArrayUtil.bytes(2)},
+                                              new byte[][] {}), 
listener.values);
 
         batch.add(new SimpleStatement("insert into abc.def (id, v) values 
(1,1)"));
         try
@@ -162,10 +162,17 @@ public class QueryEventsTest extends CQLTester
         assertEquals(3, listener.queries.size());
         assertEquals(BatchStatement.Type.UNLOGGED, listener.batchType);
         assertEquals(newArrayList(q1, q1, q2), listener.queries);
-        assertEquals(newArrayList(newArrayList(ByteBufferUtil.bytes(1), 
ByteBufferUtil.bytes(1)),
-                                  newArrayList(ByteBufferUtil.bytes(2), 
ByteBufferUtil.bytes(2)),
-                                  newArrayList(newArrayList()),
-                                  newArrayList(newArrayList())), 
listener.values);
+        assertListOfArraysEquals(newArrayList(new byte[][] { 
ByteArrayUtil.bytes(1), ByteArrayUtil.bytes(1)},
+                                              new byte[][] 
{ByteArrayUtil.bytes(2), ByteArrayUtil.bytes(2)},
+                                              new byte[][] {},
+                                              new byte[][] {}), 
listener.values);
+    }
+
+    private static void assertListOfArraysEquals(List<byte[][]> expected, 
List<byte[][]> actual)
+    {
+        assertEquals(expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++)
+            assertArrayEquals(expected.get(i), actual.get(i));
     }
 
     @Test
@@ -323,7 +330,7 @@ public class QueryEventsTest extends CQLTester
     private static class BatchMockListener extends MockListener
     {
         private List<String> queries;
-        private List<List<ByteBuffer>> values;
+        private List<byte[][]> values;
         private BatchStatement.Type batchType;
 
         BatchMockListener(ColumnFamilyStore currentColumnFamilyStore)
@@ -331,7 +338,7 @@ public class QueryEventsTest extends CQLTester
             super(currentColumnFamilyStore);
         }
 
-        public void batchSuccess(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values, 
QueryOptions options, QueryState state, long queryTime, Message.Response 
response)
+        public void batchSuccess(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<byte[][]> values, 
QueryOptions options, QueryState state, long queryTime, Message.Response 
response)
         {
             inc("batchSuccess");
             this.queries = queries;
@@ -340,7 +347,7 @@ public class QueryEventsTest extends CQLTester
             this.queryTime = queryTime;
         }
 
-        public void batchFailure(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values, 
QueryOptions options, QueryState state, Exception cause)
+        public void batchFailure(BatchStatement.Type batchType, List<? extends 
CQLStatement> statements, List<String> queries, List<byte[][]> values, 
QueryOptions options, QueryState state, Exception cause)
         {
             inc("batchFailure");
             this.queries = queries;
diff --git a/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java 
b/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
index e95eb0c8b4..ec3ac0df91 100644
--- a/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
+++ b/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.binlog.BinLogTest;
 
@@ -462,9 +463,9 @@ public class FullQueryLoggerTest extends CQLTester
         configureFQL();
         logBatch(Type.UNLOGGED,
                  Arrays.asList("foo1", "foo2"),
-                 Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
-                                             ByteBuffer.allocateDirect(2)),
-                               Collections.emptyList()),
+                 Arrays.asList(new byte[][] {new byte[1],
+                                             new byte[2]},
+                               ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
                  QueryOptions.DEFAULT,
                  queryState("abcdefgh"),
                  1);
@@ -486,9 +487,9 @@ public class FullQueryLoggerTest extends CQLTester
         configureFQL();
         logBatch(Type.UNLOGGED,
                  Arrays.asList("foo1", "foo2"),
-                 Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
-                                             ByteBuffer.allocateDirect(2)),
-                               Collections.emptyList()),
+                 Arrays.asList(new byte[][] {new byte[1],
+                                             new byte[2]},
+                               ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
                  QueryOptions.DEFAULT,
                  queryState(),
                  1);
@@ -599,12 +600,10 @@ public class FullQueryLoggerTest extends CQLTester
 
         bigList = null;
         //The size of the list of values should be reflected
-        List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+        List<byte[][]> bigValues = new ArrayList<>(100000);
         for (int ii = 0; ii < 100000; ii++)
-        {
-            bigValues.add(new ArrayList<>(0));
-        }
-        bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+            bigValues.add(ii == 0 ? new byte[][] {new byte[1024 * 1024 * 5]} : 
new byte[][]{});
+
         batch = new Batch(Type.UNLOGGED, new ArrayList<>(), bigValues, 
QueryOptions.DEFAULT, queryState(), 1);
         assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
 
@@ -642,7 +641,7 @@ public class FullQueryLoggerTest extends CQLTester
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullValuesValue() throws Exception
     {
-        logBatch(Type.UNLOGGED, new ArrayList<>(), 
Arrays.asList((List<ByteBuffer>)null), null, queryState(), 1);
+        logBatch(Type.UNLOGGED, new ArrayList<>(), 
Collections.singletonList(null), null, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
@@ -737,7 +736,7 @@ public class FullQueryLoggerTest extends CQLTester
 
     private void logBatch(BatchStatement.Type type,
                           List<String> queries,
-                          List<List<ByteBuffer>> values,
+                          List<byte[][]> values,
                           QueryOptions options,
                           QueryState queryState,
                           long time)
diff --git 
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
 
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
index b94ed8db48..11ec07ba99 100644
--- 
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
+++ 
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.metrics;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,7 @@ import org.apache.cassandra.service.paxos.Paxos;
 import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.transport.messages.BatchMessage;
 import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.utils.ByteArrayUtil;
 
 import static 
org.apache.cassandra.metrics.CassandraMetricsRegistry.METRIC_SCOPE_UNDEFINED;
 import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
@@ -261,7 +261,7 @@ public class ClientRequestRowAndColumnMetricsTest extends 
CQLTester
             String first = String.format("INSERT INTO %s.%s (pk, v1, v2) 
VALUES (1, 10, 100)", KEYSPACE, currentTable());
             String second = String.format("INSERT INTO %s.%s (pk, v1, v2) 
VALUES (2, 20, 200)", KEYSPACE, currentTable());
 
-            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            List<byte[][]> values = 
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
             BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
             client.execute(batch);
 
@@ -316,7 +316,7 @@ public class ClientRequestRowAndColumnMetricsTest extends 
CQLTester
             String first = String.format("INSERT INTO %s.%s (pk, ck, v0, v1, 
v2) VALUES (0, 1, 2, 3, 4)", KEYSPACE, currentTable());
             String second = String.format("INSERT INTO %s.%s (pk, ck, v0, v1, 
v2) VALUES (0, 2, 3, 5, 6)", KEYSPACE, currentTable());
 
-            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            List<byte[][]> values = 
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
             BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
             client.execute(batch);
 
@@ -377,7 +377,7 @@ public class ClientRequestRowAndColumnMetricsTest extends 
CQLTester
             String first = String.format("INSERT INTO %s.%s (pk, ck, v1, v2) 
VALUES (1, 2, 3, 4)", KEYSPACE, currentTable());
             String second = String.format("DELETE FROM %s.%s WHERE pk = 1 AND 
ck > 1", KEYSPACE, currentTable());
 
-            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            List<byte[][]> values = 
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, 
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
             BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
             client.execute(batch);
 
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java 
b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 42c6cbd021..e7889fca0d 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.transport.messages.ExecuteMessage;
 import org.apache.cassandra.transport.messages.PrepareMessage;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.ReflectionUtils;
 
@@ -172,7 +173,7 @@ public class MessagePayloadTest extends CQLTester
 
                 BatchMessage batchMessage = new 
BatchMessage(BatchStatement.Type.UNLOGGED,
                                                              
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) 
VALUES (1, 'foo')"),
-                                                             
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+                                                             
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
                                                              queryOptions);
                 reqMap = Collections.singletonMap("foo", bytes(45));
                 responsePayload = respMap = Collections.singletonMap("bar", 
bytes(45));
@@ -241,7 +242,7 @@ public class MessagePayloadTest extends CQLTester
 
                 BatchMessage batchMessage = new 
BatchMessage(BatchStatement.Type.UNLOGGED,
                                                              
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) 
VALUES (1, 'foo')"),
-                                                             
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+                                                             
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
                                                              
QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(45));
                 responsePayload = respMap = Collections.singletonMap("bar", 
bytes(45));
@@ -342,7 +343,7 @@ public class MessagePayloadTest extends CQLTester
 
                 BatchMessage batchMessage = new 
BatchMessage(BatchStatement.Type.UNLOGGED,
                                                              
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) 
VALUES (1, 'foo')"),
-                                                             
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+                                                             
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
                                                              
QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(45));
                 responsePayload = Collections.singletonMap("bar", bytes(45));
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java 
b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
index 7358ad876e..8a8ca317a0 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -244,11 +244,15 @@ public abstract class FQLQuery implements 
Comparable<FQLQuery>
         public BinLog.ReleaseableWriteMarshallable toMarshallable()
         {
             List<String> queryStrings = new ArrayList<>();
-            List<List<ByteBuffer>> values = new ArrayList<>();
+            List<byte[][]> values = new ArrayList<>();
             for (Single q : queries)
             {
                 queryStrings.add(q.query);
-                values.add(q.values);
+                byte[][] valuesAsArray = new byte[q.values.size()][];
+                int i = 0;
+                for (ByteBuffer value : q.values)
+                    valuesAsArray[i++] = value.array();
+                values.add(valuesAsArray);
             }
             return new 
FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()),
 queryStrings, values, queryOptions, queryState, queryStartTime);
         }
diff --git 
a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java 
b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
index 894bfc40da..ef8863a683 100644
--- a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.fqltool.commands.Replay;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Pair;
@@ -705,11 +706,11 @@ public class FQLReplayTest
                 {
                     int batchSize = random ? r.nextInt(99) + 1 : i + 1;
                     List<String> queries = new ArrayList<>(batchSize);
-                    List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
+                    List<byte[][]> values = new ArrayList<>(batchSize);
                     for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 
10); jj++)
                     {
                         queries.add("aaaaaa batch "+i+":"+jj);
-                        values.add(Collections.emptyList());
+                        values.add(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
                     }
                     FullQueryLogger.Batch batch = new 
FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
                                                                             
queries,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to