This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a72868066 Use byte[] directly in QueryOptions instead of ByteBuffer
and convert them to ArrayCell instead of BufferCell.
8a72868066 is described below
commit 8a7286806620928edcff733e2f6d6d7b0e663619
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Dec 29 12:05:48 2024 +0000
Use byte[] directly in QueryOptions instead of ByteBuffer and convert them
to ArrayCell instead of BufferCell.
Additionally replace List with array for bind values (we know the size in
advance during a decoding), so in total: List<List> is replaced with byte[][]
QueryOptions classes support both ways to get values now: using an old API with
ByteBuffer and a new API with byte[].
Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for
CASSANDRA-20166
---
CHANGES.txt | 1 +
.../apache/cassandra/audit/AuditLogManager.java | 5 +-
.../apache/cassandra/cql3/BatchQueryOptions.java | 62 +++++++++---
.../org/apache/cassandra/cql3/QueryEvents.java | 17 +++-
.../org/apache/cassandra/cql3/QueryOptions.java | 108 +++++++++++++++++++--
.../org/apache/cassandra/cql3/QueryProcessor.java | 12 +--
.../apache/cassandra/cql3/UpdateParameters.java | 46 ++++++---
.../org/apache/cassandra/cql3/terms/Constants.java | 22 ++++-
.../org/apache/cassandra/cql3/terms/Marker.java | 38 +++++++-
src/java/org/apache/cassandra/cql3/terms/Term.java | 18 ++++
.../org/apache/cassandra/db/marshal/ListType.java | 12 +++
.../org/apache/cassandra/db/marshal/MapType.java | 28 ++++--
.../cassandra/db/marshal/MultiElementType.java | 2 +
.../org/apache/cassandra/db/marshal/SetType.java | 18 +++-
.../org/apache/cassandra/db/marshal/TupleType.java | 18 +++-
.../org/apache/cassandra/db/marshal/UserType.java | 18 +++-
.../apache/cassandra/db/marshal/VectorType.java | 20 +++-
.../org/apache/cassandra/db/rows/ArrayCell.java | 12 +++
src/java/org/apache/cassandra/db/rows/Cell.java | 5 +
.../org/apache/cassandra/db/rows/NativeCell.java | 61 ++++++++----
.../org/apache/cassandra/fql/FullQueryLogger.java | 21 ++--
.../org/apache/cassandra/transport/CBUtil.java | 61 ++++++++++++
.../cassandra/transport/messages/BatchMessage.java | 21 ++--
.../org/apache/cassandra/utils/ByteArrayUtil.java | 14 +++
.../cassandra/utils/memory/NativeAllocator.java | 4 +-
.../test/microbench/BatchStatementBench.java | 10 +-
.../org/apache/cassandra/cql3/QueryEventsTest.java | 31 +++---
.../apache/cassandra/fql/FullQueryLoggerTest.java | 25 +++--
.../ClientRequestRowAndColumnMetricsTest.java | 8 +-
.../cassandra/transport/MessagePayloadTest.java | 7 +-
.../src/org/apache/cassandra/fqltool/FQLQuery.java | 8 +-
.../apache/cassandra/fqltool/FQLReplayTest.java | 5 +-
32 files changed, 578 insertions(+), 160 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2391d10537..9cac450533 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Use byte[] directly in QueryOptions instead of ByteBuffer and convert them
to ArrayCell instead of BufferCell to reduce allocations (CASSANDRA-20166)
* Log queries scanning too many SSTables per read (CASSANDRA-21048)
* Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
* Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index f0e44f6f84..c453dcbb80 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -22,7 +22,6 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
@@ -317,7 +316,7 @@ public class AuditLogManager implements
QueryEvents.Listener, AuthEvents.Listene
public void batchSuccess(BatchStatement.Type batchType,
List<? extends CQLStatement> statements,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState state,
long queryStartTimeMillis,
@@ -358,7 +357,7 @@ public class AuditLogManager implements
QueryEvents.Listener, AuthEvents.Listene
}
}
- public void batchFailure(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values,
QueryOptions options, QueryState state, Exception cause)
+ public void batchFailure(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<byte[][]> values,
QueryOptions options, QueryState state, Exception cause)
{
String auditMessage = String.format("BATCH of %d statements at
consistency %s", statements.size(), options.getConsistency());
AuditLogEntry entry = new
AuditLogEntry.Builder(state).setOperation(auditMessage)
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index dc38c4d0ce..1e9ecc87e0 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.service.QueryState;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
+import static
org.apache.cassandra.utils.ByteArrayUtil.convertToByteBufferValue;
+
public abstract class BatchQueryOptions
{
public static BatchQueryOptions DEFAULT =
withoutPerStatementVariables(QueryOptions.DEFAULT);
@@ -49,7 +51,7 @@ public abstract class BatchQueryOptions
return new WithoutPerStatementVariables(options,
Collections.<Object>emptyList());
}
- public static BatchQueryOptions withPerStatementVariables(QueryOptions
options, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+ public static BatchQueryOptions withPerStatementVariables(QueryOptions
options, List<byte[][]> variables, List<Object> queryOrIdList)
{
return new WithPerStatementVariables(options, variables,
queryOrIdList);
}
@@ -91,6 +93,50 @@ public abstract class BatchQueryOptions
return wrapped.getNowInSeconds(state);
}
+ private static class BatchQueryOptionsWrapper extends
QueryOptions.QueryOptionsWrapper {
+ private final byte[][] valuesAsByteArray;
+ private List<ByteBuffer> values; // initialized on demand
+
+ BatchQueryOptionsWrapper(QueryOptions wrapped, byte[][] vars)
+ {
+ super(wrapped);
+ this.valuesAsByteArray = vars;
+ }
+ public List<ByteBuffer> getValues()
+ {
+ if (values == null)
+ {
+ values = new ArrayList<>(valuesAsByteArray.length);
+ for (byte[] byteArrayValue : valuesAsByteArray)
+ values.add(convertToByteBufferValue(byteArrayValue));
+ }
+ return values;
+ }
+
+ public int getValuesSize()
+ {
+ return valuesAsByteArray.length;
+ }
+
+ public ByteBuffer getValue(int index)
+ {
+ if (values == null) // we convert values to ByteBuffer in a lazy
way, on demand
+ return convertToByteBufferValue(valuesAsByteArray[index]);
+ else
+ return values.get(index);
+ }
+
+ public boolean isByteArrayValuesGetSupported()
+ {
+ return true;
+ }
+
+ public byte[][] getByteArrayValues()
+ {
+ return valuesAsByteArray;
+ }
+ }
+
private static class WithoutPerStatementVariables extends BatchQueryOptions
{
private WithoutPerStatementVariables(QueryOptions wrapped,
List<Object> queryOrIdList)
@@ -108,20 +154,12 @@ public abstract class BatchQueryOptions
{
private final List<QueryOptions> perStatementOptions;
- private WithPerStatementVariables(QueryOptions wrapped,
List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+ private WithPerStatementVariables(QueryOptions wrapped, List<byte[][]>
variables, List<Object> queryOrIdList)
{
super(wrapped, queryOrIdList);
this.perStatementOptions = new ArrayList<>(variables.size());
- for (final List<ByteBuffer> vars : variables)
- {
- perStatementOptions.add(new
QueryOptions.QueryOptionsWrapper(wrapped)
- {
- public List<ByteBuffer> getValues()
- {
- return vars;
- }
- });
- }
+ for (final byte[][] vars : variables)
+ perStatementOptions.add(new BatchQueryOptionsWrapper(wrapped,
vars));
}
public QueryOptions forStatement(int i)
diff --git a/src/java/org/apache/cassandra/cql3/QueryEvents.java
b/src/java/org/apache/cassandra/cql3/QueryEvents.java
index 52414ebb74..7b341ed200 100644
--- a/src/java/org/apache/cassandra/cql3/QueryEvents.java
+++ b/src/java/org/apache/cassandra/cql3/QueryEvents.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.cql3;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -143,10 +142,14 @@ public class QueryEvents
}
}
+ /**
+ * Note: we use {@code byte[][]} for values of batch statement elements
instead of {@code List<ByteBuffer>} here
+ * to reduce memory allocation when we decode and process a batch statement
+ */
public void notifyBatchSuccess(BatchStatement.Type batchType,
List<? extends CQLStatement> statements,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState state,
long queryTime,
@@ -164,10 +167,14 @@ public class QueryEvents
}
}
+ /**
+ * Note: we use {@code byte[][]} for values of batch statement elements
instead of {@code List<ByteBuffer>} here
+ * to reduce memory allocation when we decode and process a batch statement
+ */
public void notifyBatchFailure(List<QueryHandler.Prepared> prepared,
BatchStatement.Type batchType,
List<Object> queryOrIdList,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState state,
Exception cause)
@@ -288,7 +295,7 @@ public class QueryEvents
default void batchSuccess(BatchStatement.Type batchType,
List<? extends CQLStatement> statements,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState state,
long queryTime,
@@ -296,7 +303,7 @@ public class QueryEvents
default void batchFailure(BatchStatement.Type batchType,
List<? extends CQLStatement> statements,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState state,
Exception cause) {}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index c4e6e33b25..fae5072460 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -37,11 +37,14 @@ import org.apache.cassandra.transport.CBCodec;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.CassandraUInt;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
+import static
org.apache.cassandra.utils.ByteArrayUtil.convertToByteBufferValue;
+
/**
* Options for a query.
*/
@@ -49,6 +52,7 @@ public abstract class QueryOptions
{
public static final QueryOptions DEFAULT = new
DefaultQueryOptions(ConsistencyLevel.ONE,
Collections.emptyList(),
+
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
false,
SpecificOptions.DEFAULT,
ProtocolVersion.CURRENT);
@@ -62,22 +66,22 @@ public abstract class QueryOptions
public static QueryOptions forInternalCalls(ConsistencyLevel consistency,
List<ByteBuffer> values)
{
- return new DefaultQueryOptions(consistency, values, false,
SpecificOptions.DEFAULT, ProtocolVersion.V3);
+ return new DefaultQueryOptions(consistency, values,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT,
ProtocolVersion.V3);
}
public static QueryOptions forInternalCallsWithNowInSec(long nowInSec,
ConsistencyLevel consistency, List<ByteBuffer> values)
{
- return new DefaultQueryOptions(consistency, values, false,
SpecificOptions.DEFAULT.withNowInSec(nowInSec), ProtocolVersion.CURRENT);
+ return new DefaultQueryOptions(consistency, values,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false,
SpecificOptions.DEFAULT.withNowInSec(nowInSec), ProtocolVersion.CURRENT);
}
public static QueryOptions forInternalCalls(List<ByteBuffer> values)
{
- return new DefaultQueryOptions(ConsistencyLevel.ONE, values, false,
SpecificOptions.DEFAULT, ProtocolVersion.V3);
+ return new DefaultQueryOptions(ConsistencyLevel.ONE, values,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT,
ProtocolVersion.V3);
}
public static QueryOptions forProtocolVersion(ProtocolVersion
protocolVersion)
{
- return new DefaultQueryOptions(null, null, true, null,
protocolVersion);
+ return new DefaultQueryOptions(null, null,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, true, null, protocolVersion);
}
public static QueryOptions create(ConsistencyLevel consistency,
@@ -105,6 +109,7 @@ public abstract class QueryOptions
{
return new DefaultQueryOptions(consistency,
values,
+
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
skipMetadata,
new SpecificOptions(pageSize,
pagingState, serialConsistency, timestamp, keyspace, nowInSeconds),
version);
@@ -127,6 +132,34 @@ public abstract class QueryOptions
public abstract ConsistencyLevel getConsistency();
public abstract List<ByteBuffer> getValues();
+
+ public abstract int getValuesSize();
+
+ public ByteBuffer getValue(int index)
+ {
+ return getValues().get(index);
+ }
+
+ /**
+ * to check if it is possible to get values as byte[] instead of ByteBuffer
+ * the logic is added as a memory allocation optimization
+ * to avoid creating of HeapByteBuffer wrappers for some typical requests
+ * getValues method is still in use and must be supported even if true is
returned
+ */
+ public boolean isByteArrayValuesGetSupported()
+ {
+ return false;
+ }
+
+ /**
+ * an allocation-optimized version of getValues() method
+ */
+ public byte[][] getByteArrayValues()
+ {
+ throw new IllegalStateException("getByteArrayValues() method is not
implemented, " +
+ "isByteArrayValuesGetSupported() must
be always checked before invoking this method");
+ }
+
public abstract boolean skipMetadata();
/**
@@ -338,7 +371,9 @@ public abstract class QueryOptions
static class DefaultQueryOptions extends QueryOptions
{
private final ConsistencyLevel consistency;
- private final List<ByteBuffer> values;
+ private List<ByteBuffer> values; // initialized on demand
+ private final byte[][] valuesAsByteArray;
+ private final boolean isByteArrayGetSupported;
private final boolean skipMetadata;
private final SpecificOptions options;
@@ -346,10 +381,14 @@ public abstract class QueryOptions
private final transient ProtocolVersion protocolVersion;
private final transient ReadThresholds readThresholds =
ReadThresholds.create();
- DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer>
values, boolean skipMetadata, SpecificOptions options, ProtocolVersion
protocolVersion)
+ DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer>
values,
+ byte[][] valuesAsByteArray, boolean skipMetadata,
+ SpecificOptions options, ProtocolVersion
protocolVersion)
{
this.consistency = consistency;
this.values = values;
+ isByteArrayGetSupported = (values == null);
+ this.valuesAsByteArray = valuesAsByteArray;
this.skipMetadata = skipMetadata;
this.options = options;
this.protocolVersion = protocolVersion;
@@ -362,9 +401,38 @@ public abstract class QueryOptions
public List<ByteBuffer> getValues()
{
+ if (values == null)
+ {
+ values = new ArrayList<>(valuesAsByteArray.length);
+ for (byte[] byteArrayValue : valuesAsByteArray)
+ values.add(convertToByteBufferValue(byteArrayValue));
+ }
return values;
}
+ public int getValuesSize()
+ {
+ return isByteArrayValuesGetSupported() ? valuesAsByteArray.length
: values.size();
+ }
+
+ public ByteBuffer getValue(int index)
+ {
+ if (values != null)
+ return values.get(index);
+
+ return convertToByteBufferValue(valuesAsByteArray[index]);
+ }
+
+ public boolean isByteArrayValuesGetSupported()
+ {
+ return isByteArrayGetSupported;
+ }
+
+ public byte[][] getByteArrayValues()
+ {
+ return valuesAsByteArray;
+ }
+
public boolean skipMetadata()
{
return skipMetadata;
@@ -406,6 +474,26 @@ public abstract class QueryOptions
return wrapped.getConsistency();
}
+ public int getValuesSize()
+ {
+ return wrapped.getValuesSize();
+ }
+
+ public ByteBuffer getValue(int index)
+ {
+ return this.wrapped.getValue(index);
+ }
+
+ public boolean isByteArrayValuesGetSupported()
+ {
+ return wrapped.isByteArrayValuesGetSupported();
+ }
+
+ public byte[][] getByteArrayValues()
+ {
+ return wrapped.getByteArrayValues();
+ }
+
public boolean skipMetadata()
{
return wrapped.skipMetadata();
@@ -625,19 +713,19 @@ public abstract class QueryOptions
? (int)body.readUnsignedInt()
: (int)body.readUnsignedByte();
- List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+ byte[][] values = ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS;
List<String> names = null;
if (Flag.contains(flags, Flag.VALUES))
{
if (Flag.contains(flags, Flag.NAMES_FOR_VALUES))
{
- Pair<List<String>, List<ByteBuffer>> namesAndValues =
CBUtil.readNameAndValueList(body, version);
+ Pair<List<String>, byte[][]> namesAndValues =
CBUtil.readNameAndValueListAsByteArrays(body, version);
names = namesAndValues.left;
values = namesAndValues.right;
}
else
{
- values = CBUtil.readValueList(body, version);
+ values = CBUtil.readValueListAsByteArrays(body, version);
}
}
@@ -665,7 +753,7 @@ public abstract class QueryOptions
options = new SpecificOptions(pageSize, pagingState,
serialConsistency, timestamp, keyspace, nowInSeconds);
}
- DefaultQueryOptions opts = new DefaultQueryOptions(consistency,
values, skipMetadata, options, version);
+ DefaultQueryOptions opts = new DefaultQueryOptions(consistency,
null, values, skipMetadata, options, version);
return names == null ? opts : new OptionsWithNames(opts, names);
}
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 7cb1d668e0..56fca66220 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -914,19 +914,19 @@ public class QueryProcessor implements QueryHandler
public ResultMessage processPrepared(CQLStatement statement, QueryState
queryState, QueryOptions options, Dispatcher.RequestTime requestTime)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = options.getValues();
+ int variablesSize = options.getValuesSize();
// Check to see if there are any bound variables to verify
- if (!(variables.isEmpty() && statement.getBindVariables().isEmpty()))
+ if (!(variablesSize == 0 && statement.getBindVariables().isEmpty()))
{
- if (variables.size() != statement.getBindVariables().size())
+ if (variablesSize != statement.getBindVariables().size())
throw new InvalidRequestException(String.format("there were %d
markers(?) in CQL but %d bound variables",
statement.getBindVariables().size(),
-
variables.size()));
+
variablesSize));
// at this point there is a match in count between markers and
variables that is non-zero
if (logger.isTraceEnabled())
- for (int i = 0; i < variables.size(); i++)
- logger.trace("[{}] '{}'", i+1, variables.get(i));
+ for (int i = 0; i < variablesSize; i++)
+ logger.trace("[{}] '{}'", i+1, options.getValues().get(i));
}
metrics.preparedStatementsExecuted.inc();
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 85a97ccc3d..a0a51a3829 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.rows.ArrayCell;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
@@ -165,16 +166,14 @@ public class UpdateParameters
return addCell(column, null, value);
}
- public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer
value) throws InvalidRequestException
+ public Cell<?> addCell(ColumnMetadata column, byte[] value) throws
InvalidRequestException
{
- // General column value size
- Guardrails.columnValueSize.guard(value.remaining(),
column.name.toString(), false, clientState);
-
- // Check specific sizes per column type
- validateColumnSize(column, value);
+ return addCell(column, null, value);
+ }
- if (path != null && column.type.isMultiCell())
- Guardrails.columnValueSize.guard(path.dataSize(),
column.name.toString(), false, clientState);
+ public Cell<?> addCell(ColumnMetadata column, CellPath path, ByteBuffer
value) throws InvalidRequestException
+ {
+ validateCell(column, path, value.remaining());
Cell<?> cell = ttl == LivenessInfo.NO_TTL
? BufferCell.live(column, timestamp, value, path)
@@ -183,6 +182,29 @@ public class UpdateParameters
return cell;
}
+ public Cell<?> addCell(ColumnMetadata column, CellPath path, byte[] value)
throws InvalidRequestException
+ {
+ validateCell(column, path, value.length);
+
+ Cell<?> cell = ttl == LivenessInfo.NO_TTL
+ ? ArrayCell.live(column, timestamp, value, path)
+ : ArrayCell.expiring(column, timestamp, ttl, nowInSec,
value, path);
+ builder.addCell(cell);
+ return cell;
+ }
+
+ private void validateCell(ColumnMetadata column, CellPath path, int
valueSize)
+ {
+ // General column value size
+ Guardrails.columnValueSize.guard(valueSize, column.name.toString(),
false, clientState);
+
+ // Check specific sizes per column type
+ validateColumnSize(column, valueSize);
+
+ if (path != null && column.type.isMultiCell())
+ Guardrails.columnValueSize.guard(path.dataSize(),
column.name.toString(), false, clientState);
+ }
+
public void addRow(Row row)
{
newRow(row.clustering());
@@ -206,20 +228,20 @@ public class UpdateParameters
});
}
- private void validateColumnSize(ColumnMetadata column, ByteBuffer value)
+ private void validateColumnSize(ColumnMetadata column, int valueSize)
{
CQL3Type cql3Type = column.type.asCQL3Type();
if (cql3Type.equals(CQL3Type.Native.ASCII)) // Ascii size specific
guardrail
{
- Guardrails.columnAsciiValueSize.guard(value.remaining(),
column.name.toString(), false, clientState);
+ Guardrails.columnAsciiValueSize.guard(valueSize,
column.name.toString(), false, clientState);
}
else if (cql3Type.equals(CQL3Type.Native.BLOB)) // Blob size specific
guardrail
{
- Guardrails.columnBlobValueSize.guard(value.remaining(),
column.name.toString(), false, clientState);
+ Guardrails.columnBlobValueSize.guard(valueSize,
column.name.toString(), false, clientState);
}
else if (cql3Type.equals(CQL3Type.Native.TEXT)) // text and varchar
size specific guardrails
{
- Guardrails.columnTextAndVarcharValueSize.guard(value.remaining(),
column.name.toString(), false, clientState);
+ Guardrails.columnTextAndVarcharValueSize.guard(valueSize,
column.name.toString(), false, clientState);
}
}
diff --git a/src/java/org/apache/cassandra/cql3/terms/Constants.java
b/src/java/org/apache/cassandra/cql3/terms/Constants.java
index a912f5556d..49d9592567 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Constants.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FastByteOperations;
@@ -479,11 +480,22 @@ public abstract class Constants
public void execute(DecoratedKey partitionKey, UpdateParameters
params) throws InvalidRequestException
{
- ByteBuffer value = t.bindAndGet(params.options);
- if (value == null)
- params.addTombstone(column);
- else if (value != ByteBufferUtil.UNSET_BYTE_BUFFER) // use
reference equality and not object equality
- params.addCell(column, value);
+ if (t.isByteArrayGetSupported(params.options))
+ {
+ byte[] value = t.bindAndGetByteArray(params.options);
+ if (value == null)
+ params.addTombstone(column);
+ else if (value != ByteArrayUtil.UNSET_BYTE_ARRAY) // use
reference equality and not object equality
+ params.addCell(column, value);
+ }
+ else
+ {
+ ByteBuffer value = t.bindAndGet(params.options);
+ if (value == null)
+ params.addTombstone(column);
+ else if (value != ByteBufferUtil.UNSET_BYTE_BUFFER) // use
reference equality and not object equality
+ params.addCell(column, value);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/cql3/terms/Marker.java
b/src/java/org/apache/cassandra/cql3/terms/Marker.java
index 48fa02e9f4..26ff845987 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Marker.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Marker.java
@@ -26,9 +26,11 @@ import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.MultiElementType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -74,7 +76,7 @@ public final class Marker extends Term.NonTerminal
{
try
{
- ByteBuffer bytes = options.getValues().get(bindIndex);
+ ByteBuffer bytes = options.getValue(bindIndex);
if (bytes == null)
return null;
@@ -93,6 +95,40 @@ public final class Marker extends Term.NonTerminal
}
}
+ public boolean isByteArrayGetSupported(QueryOptions options)
+ {
+ return options.isByteArrayValuesGetSupported();
+ }
+
+ /*
+ Same logic as bind() but it returns byte[] instead of ByteBuffer and
there is no Value wrapper usage
+ */
+ @Override
+ public byte[] bindAndGetByteArray(QueryOptions options) throws
InvalidRequestException
+ {
+ try
+ {
+ byte[] bytes = options.getByteArrayValues()[bindIndex];
+ if (bytes == null)
+ return null;
+
+ if (bytes == ByteArrayUtil.UNSET_BYTE_ARRAY)
+ return ByteArrayUtil.UNSET_BYTE_ARRAY;
+
+ if (receiver.type instanceof MultiElementType<?>)
+ {
+ MultiElementType<?> type = (MultiElementType<?>) receiver.type;
+ return
type.pack(type.filterSortAndValidateElementsFromArrays(type.unpack(bytes)),
ByteArrayAccessor.instance);
+ }
+ receiver.type.validate(bytes, ByteArrayAccessor.instance);
+ return bytes;
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException(e.getMessage());
+ }
+ }
+
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/cql3/terms/Term.java
b/src/java/org/apache/cassandra/cql3/terms/Term.java
index b7babb5e5e..0fc2673922 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Term.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Term.java
@@ -89,6 +89,24 @@ public interface Term
*/
ByteBuffer bindAndGet(QueryOptions options);
+ /**
+ * return true if an optimized bindAndGetByteArray() method is implemented
and can be used for this type of Term
+ * to retrieve a value as byte[] instead of default ByteBuffer provided by
bindAndGet()
+ */
+ default boolean isByteArrayGetSupported(QueryOptions options)
+ {
+ return false;
+ }
+
+ /**
+ * an allocation-optimized version of bindAndGet() method
+ */
+ default byte[] bindAndGetByteArray(QueryOptions options) throws
InvalidRequestException
+ {
+ throw new IllegalStateException("bindAndGetByteArray() method is not
implemented, " +
+ "isByteArrayGetSupported() must be
always checked before invoking this method");
+ }
+
/**
* A shorter for {@code bind(options).getElements()}.
* We expose it mainly because for constants it can avoid allocating a
temporary
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java
b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 6c391b050e..cb40b1ae2d 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -280,6 +280,18 @@ public class ListType<T> extends CollectionType<List<T>>
return buffers;
}
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ for (byte[] buffer: buffers)
+ {
+ if (buffer == null)
+ throw new MarshalException("null is not supported inside
collections");
+ elements.validate(buffer, ByteArrayAccessor.instance);
+ }
+ return buffers;
+ }
+
@Override
protected int compareNextCell(Iterator<Cell<?>> cellIterator,
Iterator<ByteBuffer> elementIter)
{
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java
b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 8ed43c3c8e..8d206c1572 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -383,26 +384,37 @@ public class MapType<K, V> extends CollectionType<Map<K,
V>>
@Override
public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteBufferAccessor.instance, getKeysType().comparatorSet.buffer);
+ }
+
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteArrayAccessor.instance, getKeysType().comparatorSet.array);
+ }
+
+ private <T> List<T> filterSortAndValidateElements(List<T> buffers,
ValueAccessor<T> valueAccessor, Comparator<T> comparator)
{
// We depend on Maps to be properly sorted by their keys, so use a
sorted map implementation here.
- SortedMap<ByteBuffer, ByteBuffer> map = new TreeMap<>(getKeysType());
- Iterator<ByteBuffer> iter = buffers.iterator();
+ SortedMap<T, T> map = new TreeMap<>(comparator);
+ Iterator<T> iter = buffers.iterator();
while (iter.hasNext())
{
- ByteBuffer keyBytes = iter.next();
- ByteBuffer valueBytes = iter.next();
+ T keyBytes = iter.next();
+ T valueBytes = iter.next();
if (keyBytes == null || valueBytes == null)
throw new MarshalException("null is not supported inside
collections");
- getKeysType().validate(keyBytes);
- getValuesType().validate(valueBytes);
+ getKeysType().validate(keyBytes, valueAccessor);
+ getValuesType().validate(valueBytes, valueAccessor);
map.put(keyBytes, valueBytes);
}
- List<ByteBuffer> sortedBuffers = new ArrayList<>(map.size() << 1);
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
+ List<T> sortedBuffers = new ArrayList<>(map.size() << 1);
+ for (Map.Entry<T, T> entry : map.entrySet())
{
sortedBuffers.add(entry.getKey());
sortedBuffers.add(entry.getValue());
diff --git a/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
b/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
index 99ce762d6a..dc8f912e09 100644
--- a/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MultiElementType.java
@@ -99,6 +99,8 @@ public abstract class MultiElementType<T> extends
AbstractType<T>
*/
public abstract List<ByteBuffer>
filterSortAndValidateElements(List<ByteBuffer> buffers);
+ public abstract List<byte[]>
filterSortAndValidateElementsFromArrays(List<byte[]> buffers);
+
/**
* Compares the multicell value represensted by the column data with the
specified elements.
* @param columnData the column data representing the multicell value
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java
b/src/java/org/apache/cassandra/db/marshal/SetType.java
index f2568e3cbc..8a8ae45e04 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -255,12 +256,23 @@ public class SetType<T> extends CollectionType<Set<T>>
@Override
public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer>
buffers)
{
- SortedSet<ByteBuffer> sorted = new TreeSet<>(elements);
- for (ByteBuffer buffer: buffers)
+ return filterSortAndValidateElements(buffers,
ByteBufferAccessor.instance, elements.comparatorSet.buffer);
+ }
+
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteArrayAccessor.instance, elements.comparatorSet.array);
+ }
+
+ private <V> List<V> filterSortAndValidateElements(List<V> buffers,
ValueAccessor<V> valueAccessor, Comparator<V> comparator)
+ {
+ SortedSet<V> sorted = new TreeSet<>(comparator);
+ for (V buffer: buffers)
{
if (buffer == null)
throw new MarshalException("null is not supported inside
collections");
- elements.validate(buffer);
+ elements.validate(buffer, valueAccessor);
sorted.add(buffer);
}
return new ArrayList<>(sorted);
diff --git a/src/java/org/apache/cassandra/db/marshal/TupleType.java
b/src/java/org/apache/cassandra/db/marshal/TupleType.java
index 47301b6e97..8004507762 100644
--- a/src/java/org/apache/cassandra/db/marshal/TupleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TupleType.java
@@ -40,6 +40,7 @@ import
org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.*;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JsonUtils;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
@@ -388,6 +389,17 @@ public class TupleType extends MultiElementType<ByteBuffer>
@Override
public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+ }
+
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+ }
+
+ private <T> List<T> filterSortAndValidateElements(List<T> buffers, T
unsetValue, ValueAccessor<T> valueAccessor)
{
if (buffers.size() > size())
throw new MarshalException(String.format("Tuple value contains too
many fields (expected %s, got %s)", size(), buffers.size()));
@@ -395,12 +407,12 @@ public class TupleType extends
MultiElementType<ByteBuffer>
for (int i = 0; i < buffers.size(); i++)
{
// Since A tuple value is always written in its entirety Cassandra
can't preserve a pre-existing value by 'not setting' the new value. Reject the
query.
- ByteBuffer buffer = buffers.get(i);
+ T buffer = buffers.get(i);
if (buffer == null)
continue;
- if (buffer == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ if (buffer == unsetValue)
throw new InvalidRequestException(String.format("Invalid unset
value for tuple field number %d", i));
- type(i).validate(buffer);
+ type(i).validate(buffer, valueAccessor);
}
return buffers;
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java
b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 23b172c816..1f0f703070 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.UserTypeSerializer;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JsonUtils;
import org.apache.cassandra.utils.Pair;
@@ -644,6 +645,17 @@ public class UserType extends TupleType implements
SchemaElement
@Override
public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+ }
+
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+ }
+
+ private <T> List<T> filterSortAndValidateElements(List<T> buffers, T
unsetValue, ValueAccessor<T> valueAccessor)
{
if (buffers.size() > size())
throw new MarshalException(String.format("UDT value contained too
many fields (expected %s, got %s)", size(), buffers.size()));
@@ -652,12 +664,12 @@ public class UserType extends TupleType implements
SchemaElement
{
// Since a frozen UDT value is always written in its entirety
Cassandra can't preserve a pre-existing
// value by 'not setting' the new value. Reject the query.
- ByteBuffer buffer = buffers.get(i);
+ T buffer = buffers.get(i);
if (buffer == null)
continue;
- if (!isMultiCell() && buffer == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ if (!isMultiCell() && buffer == unsetValue)
throw new MarshalException(String.format("Invalid unset value
for field '%s' of user defined type %s", fieldNameAsString(i),
getNameAsString()));
- type(i).validate(buffer);
+ type(i).validate(buffer, valueAccessor);
}
return buffers;
diff --git a/src/java/org/apache/cassandra/db/marshal/VectorType.java
b/src/java/org/apache/cassandra/db/marshal/VectorType.java
index e70857c5aa..3cdf72270e 100644
--- a/src/java/org/apache/cassandra/db/marshal/VectorType.java
+++ b/src/java/org/apache/cassandra/db/marshal/VectorType.java
@@ -37,6 +37,7 @@ import
org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JsonUtils;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
@@ -194,20 +195,31 @@ public final class VectorType<T> extends
MultiElementType<List<T>>
@Override
public List<ByteBuffer> filterSortAndValidateElements(List<ByteBuffer>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteBufferUtil.UNSET_BYTE_BUFFER, ByteBufferAccessor.instance);
+ }
+
+ @Override
+ public List<byte[]> filterSortAndValidateElementsFromArrays(List<byte[]>
buffers)
+ {
+ return filterSortAndValidateElements(buffers,
ByteArrayUtil.UNSET_BYTE_ARRAY, ByteArrayAccessor.instance);
+ }
+
+ public <V> List<V> filterSortAndValidateElements(List<V> buffers, V
unsetValue, ValueAccessor<V> valueAccessor)
{
// We only filter and validate for this type.
if (buffers == null)
return null;
- for (ByteBuffer buffer: buffers)
+ for (V buffer : buffers)
{
- if (buffer == null || elementType.isNull(buffer))
+ if (buffer == null || elementType.isNull(buffer, valueAccessor))
throw new MarshalException("null is not supported inside
vectors");
- if (buffer == ByteBufferUtil.UNSET_BYTE_BUFFER )
+ if (buffer == unsetValue)
throw new InvalidRequestException("unset is not supported
inside vectors");
- elementType.validate(buffer);
+ elementType.validate(buffer, valueAccessor);
}
return buffers;
}
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
index 8201b6849f..77d44c3eb7 100644
--- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
+++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.ExpirationDateOverflowHandling;
import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.ValueAccessor;
@@ -59,6 +60,17 @@ public class ArrayCell extends AbstractCell<byte[]>
this.path = path;
}
+ public static ArrayCell live(ColumnMetadata column, long timestamp, byte[]
value, CellPath path)
+ {
+ return new ArrayCell(column, timestamp, NO_TTL, NO_DELETION_TIME,
value, path);
+ }
+
+ public static ArrayCell expiring(ColumnMetadata column, long timestamp,
int ttl, long nowInSec, byte[] value, CellPath path)
+ {
+ assert ttl != NO_TTL;
+ return new ArrayCell(column, timestamp, ttl,
ExpirationDateOverflowHandling.computeLocalExpirationTime(nowInSec, ttl),
value, path);
+ }
+
public long timestamp()
{
return timestamp;
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java
b/src/java/org/apache/cassandra/db/rows/Cell.java
index ced2f5229a..d3d0eb0416 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -120,6 +120,11 @@ public abstract class Cell<V> extends ColumnData
return accessor().toBuffer(value());
}
+ public byte[] valueAsArray()
+ {
+ return accessor().toArray(value());
+ }
+
/**
* The cell timestamp.
* <p>
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java
b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index ebea8a319b..56301cfa7a 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.cassandra.db.marshal.AddressBasedNativeData;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.NativeAccessor;
import org.apache.cassandra.db.marshal.NativeData;
import org.apache.cassandra.db.marshal.ValueAccessor;
@@ -66,18 +67,37 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
this.peer = 0;
}
- public NativeCell(AddressBasedAllocator allocator,
- OpOrder.Group writeOp,
- Cell<?> cell)
+ public static NativeCell build(AddressBasedAllocator allocator,
+ OpOrder.Group writeOp,
+ Cell<?> cell)
{
- this(allocator,
- writeOp,
- cell.column(),
- cell.timestamp(),
- cell.ttl(),
- cell.localDeletionTimeAsUnsignedInt(),
- cell.buffer(),
- cell.path());
+ if (cell.accessor() == ByteArrayAccessor.instance) // to avoid
ByteBuffer allocation via cell.value()
+ {
+ byte[] value = cell.valueAsArray();
+ return new NativeCell(allocator,
+ writeOp,
+ cell.column(),
+ cell.timestamp(),
+ cell.ttl(),
+ cell.localDeletionTimeAsUnsignedInt(),
+ value,
+ value.length,
+ cell.path());
+ }
+ else
+ {
+ ByteBuffer byteBuffer = cell.buffer();
+ assert byteBuffer.order() == ByteOrder.BIG_ENDIAN;
+ return new NativeCell(allocator,
+ writeOp,
+ cell.column(),
+ cell.timestamp(),
+ cell.ttl(),
+ cell.localDeletionTimeAsUnsignedInt(),
+ byteBuffer,
+ byteBuffer.remaining(),
+ cell.path());
+ }
}
// Please keep both int/long overloaded ctros public. Otherwise silent
casts will mess timestamps when one is not
@@ -91,7 +111,7 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
ByteBuffer value,
CellPath path)
{
- this(allocator, writeOp, column, timestamp, ttl,
deletionTimeLongToUnsignedInteger(localDeletionTime), value, path);
+ this(allocator, writeOp, column, timestamp, ttl,
deletionTimeLongToUnsignedInteger(localDeletionTime), value, value.remaining(),
path);
}
public NativeCell(AddressBasedAllocator allocator,
@@ -100,13 +120,13 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
long timestamp,
int ttl,
int localDeletionTimeUnsignedInteger,
- ByteBuffer value,
+ Object value,
+ int valueLength,
CellPath path)
{
super(column);
- long size = offHeapSizeWithoutPath(value.remaining());
+ long size = offHeapSizeWithoutPath(valueLength);
- assert value.order() == ByteOrder.BIG_ENDIAN;
assert column.isComplex() == (path != null);
if (path != null)
{
@@ -123,15 +143,20 @@ public class NativeCell extends AbstractCell<NativeData>
implements NativeData
NativeEndianMemoryUtil.setLong(peer + TIMESTAMP, timestamp);
NativeEndianMemoryUtil.setInt(peer + TTL, ttl);
NativeEndianMemoryUtil.setInt(peer + DELETION,
localDeletionTimeUnsignedInteger);
- NativeEndianMemoryUtil.setInt(peer + LENGTH, value.remaining());
- MemoryUtil.setBytes(peer + VALUE, value);
+ NativeEndianMemoryUtil.setInt(peer + LENGTH, valueLength);
+ if (value instanceof byte[])
+ MemoryUtil.setBytes(peer + VALUE, (byte[]) value, 0, valueLength);
+ else if (value instanceof ByteBuffer)
+ MemoryUtil.setBytes(peer + VALUE, (ByteBuffer) value);
+ else
+ throw new IllegalArgumentException();
if (path != null)
{
ByteBuffer pathbuffer = path.get(0);
assert pathbuffer.order() == ByteOrder.BIG_ENDIAN;
- long offset = peer + VALUE + value.remaining();
+ long offset = peer + VALUE + valueLength;
NativeEndianMemoryUtil.setInt(offset, pathbuffer.remaining());
MemoryUtil.setBytes(offset + 4, pathbuffer);
}
diff --git a/src/java/org/apache/cassandra/fql/FullQueryLogger.java
b/src/java/org/apache/cassandra/fql/FullQueryLogger.java
index abbfa558fe..a9697ba49f 100644
--- a/src/java/org/apache/cassandra/fql/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/fql/FullQueryLogger.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.fql;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@@ -268,7 +267,7 @@ public class FullQueryLogger implements QueryEvents.Listener
public void batchSuccess(BatchStatement.Type type,
List<? extends CQLStatement> statements,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions queryOptions,
QueryState queryState,
long batchTimeMillis,
@@ -383,11 +382,11 @@ public class FullQueryLogger implements
QueryEvents.Listener
private final int weight;
private final BatchStatement.Type batchType;
private final List<String> queries;
- private final List<List<ByteBuffer>> values;
+ private final List<byte[][]> values;
public Batch(BatchStatement.Type batchType,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions queryOptions,
QueryState queryState,
long batchTimeMillis)
@@ -406,11 +405,11 @@ public class FullQueryLogger implements
QueryEvents.Listener
queriesSize += ObjectSizes.sizeOf(checkNotNull(query));
long valuesSize = EMPTY_LIST_SIZE +
ObjectSizes.sizeOfReferenceArray(values.size());
- for (List<ByteBuffer> subValues : values)
+ for (byte[][] subValues : values)
{
- valuesSize += EMPTY_LIST_SIZE +
ObjectSizes.sizeOfReferenceArray(subValues.size());
- for (ByteBuffer subValue : subValues)
- valuesSize += ObjectSizes.sizeOnHeapOf(subValue);
+ valuesSize += EMPTY_LIST_SIZE +
ObjectSizes.sizeOfReferenceArray(subValues.length);
+ for (byte[] subValue : subValues)
+ valuesSize += ObjectSizes.sizeOfArray(subValue);
}
// No need to add the batch type which is an enum.
@@ -450,10 +449,10 @@ public class FullQueryLogger implements
QueryEvents.Listener
}
valueOut = wire.write(VALUES);
valueOut.int32(values.size());
- for (List<ByteBuffer> subValues : values)
+ for (byte[][] subValues : values)
{
- valueOut.int32(subValues.size());
- for (ByteBuffer value : subValues)
+ valueOut.int32(subValues.length);
+ for (byte[] value : subValues)
valueOut.bytes(value == null ? null :
BytesStore.wrap(value));
}
}
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 1c73206c67..3c6a3a9011 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -42,6 +42,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.LazyToString;
import org.apache.cassandra.utils.Pair;
@@ -568,6 +569,35 @@ public abstract class CBUtil
return l;
}
+ public static byte[][] readValueListAsByteArrays(ByteBuf cb,
ProtocolVersion protocolVersion)
+ {
+ int size = cb.readUnsignedShort();
+ if (size == 0)
+ return ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS;
+
+ byte[][] l = new byte[size][];
+ for (int i = 0; i < size; i++)
+ l[i] = readBoundValueAsByteArray(cb, protocolVersion);
+ return l;
+ }
+
+ public static byte[] readBoundValueAsByteArray(ByteBuf cb, ProtocolVersion
protocolVersion)
+ {
+ int length = cb.readInt();
+ if (length < 0)
+ {
+ if (protocolVersion.isSmallerThan(ProtocolVersion.V4)) // backward
compatibility for pre-version 4
+ return null;
+ if (length == -1)
+ return null;
+ else if (length == -2)
+ return ByteArrayUtil.UNSET_BYTE_ARRAY;
+ else
+ throw new ProtocolException("Invalid ByteBuf length " +
length);
+ }
+ return readRawBytes(cb, length);
+ }
+
public static void writeValueList(List<ByteBuffer> values, ByteBuf cb)
{
cb.writeShort(values.size());
@@ -575,6 +605,13 @@ public abstract class CBUtil
CBUtil.writeValue(value, cb);
}
+ public static void writeValueListOfByteArrays(byte[][] values, ByteBuf cb)
+ {
+ cb.writeShort(values.length);
+ for (byte[] value : values)
+ CBUtil.writeValue(value, cb);
+ }
+
public static int sizeOfValueList(List<ByteBuffer> values)
{
int size = 2;
@@ -583,6 +620,14 @@ public abstract class CBUtil
return size;
}
+ public static int sizeOfValueListOfByteArrays(byte[][] values)
+ {
+ int size = 2;
+ for (byte[] value : values)
+ size += CBUtil.sizeOfValue(value);
+ return size;
+ }
+
public static Pair<List<String>, List<ByteBuffer>>
readNameAndValueList(ByteBuf cb, ProtocolVersion protocolVersion)
{
int size = cb.readUnsignedShort();
@@ -599,6 +644,22 @@ public abstract class CBUtil
return Pair.create(s, l);
}
+ public static Pair<List<String>, byte[][]>
readNameAndValueListAsByteArrays(ByteBuf cb, ProtocolVersion protocolVersion)
+ {
+ int size = cb.readUnsignedShort();
+ if (size == 0)
+ return Pair.create(Collections.<String>emptyList(),
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
+
+ List<String> s = new ArrayList<>(size);
+ byte[][] l = new byte[size][];
+ for (int i = 0; i < size; i++)
+ {
+ s.add(readString(cb));
+ l[i] = readBoundValueAsByteArray(cb, protocolVersion);
+ }
+ return Pair.create(s, l);
+ }
+
public static InetSocketAddress readInet(ByteBuf cb)
{
int addrSize = cb.readByte() & 0xFF;
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index d45105f109..5c0e09be42 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.transport.messages;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -58,7 +57,7 @@ public class BatchMessage extends Message.Request
byte type = body.readByte();
int n = body.readUnsignedShort();
List<Object> queryOrIds = new ArrayList<>(n);
- List<List<ByteBuffer>> variables = new ArrayList<>(n);
+ List<byte[][]> variables = new ArrayList<>(n);
for (int i = 0; i < n; i++)
{
byte kind = body.readByte();
@@ -68,7 +67,7 @@ public class BatchMessage extends Message.Request
queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
else
throw new ProtocolException("Invalid query kind in BATCH
messages. Must be 0 or 1 but got " + kind);
- variables.add(CBUtil.readValueList(body, version));
+ variables.add(CBUtil.readValueListAsByteArrays(body, version));
}
QueryOptions options = QueryOptions.codec.decode(body, version);
@@ -91,7 +90,7 @@ public class BatchMessage extends Message.Request
else
CBUtil.writeBytes(((MD5Digest)q).bytes, dest);
- CBUtil.writeValueList(msg.values.get(i), dest);
+ CBUtil.writeValueListOfByteArrays(msg.values.get(i), dest);
}
if (version.isSmallerThan(ProtocolVersion.V3))
@@ -110,7 +109,7 @@ public class BatchMessage extends Message.Request
? CBUtil.sizeOfLongString((String)q)
: CBUtil.sizeOfBytes(((MD5Digest)q).bytes));
- size += CBUtil.sizeOfValueList(msg.values.get(i));
+ size += CBUtil.sizeOfValueListOfByteArrays(msg.values.get(i));
}
size += version.isSmallerThan(ProtocolVersion.V3)
? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency())
@@ -145,10 +144,10 @@ public class BatchMessage extends Message.Request
public final BatchStatement.Type batchType;
public final List<Object> queryOrIdList;
- public final List<List<ByteBuffer>> values;
+ public final List<byte[][]> values;
public final QueryOptions options;
- public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList,
List<List<ByteBuffer>> values, QueryOptions options)
+ public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList,
List<byte[][]> values, QueryOptions options)
{
super(Message.Type.BATCH);
this.batchType = type;
@@ -197,11 +196,11 @@ public class BatchMessage extends Message.Request
throw new
PreparedQueryNotFoundException((MD5Digest)query);
}
- List<ByteBuffer> queryValues = values.get(i);
- if (queryValues.size() !=
p.statement.getBindVariables().size())
+ byte[][] queryValues = values.get(i);
+ if (queryValues.length !=
p.statement.getBindVariables().size())
throw new InvalidRequestException(String.format("There
were %d markers(?) in CQL but %d bound variables",
p.statement.getBindVariables().size(),
-
queryValues.size()));
+
queryValues.length));
prepared.add(p);
}
@@ -260,7 +259,7 @@ public class BatchMessage extends Message.Request
for (int i = 0; i < queryOrIdList.size(); i++)
{
if (i > 0) sb.append(", ");
- sb.append(queryOrIdList.get(i)).append(" with
").append(values.get(i).size()).append(" values");
+ sb.append(queryOrIdList.get(i)).append(" with
").append(values.get(i).length).append(" values");
}
sb.append("] at consistency ").append(options.getConsistency());
return sb.toString();
diff --git a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
index f0e797c105..13fb44c3bb 100644
--- a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
public class ByteArrayUtil
{
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ public static final byte[][] EMPTY_ARRAY_OF_BYTE_ARRAYS = new byte[0][];
+ public static final byte[] UNSET_BYTE_ARRAY = new byte[0];
public static int compareUnsigned(byte[] o1, byte[] o2)
{
@@ -277,4 +279,16 @@ public class ByteArrayUtil
{
FastByteOperations.copy(src, srcPos, dst, dstPos, length);
}
+
+
+ public static ByteBuffer convertToByteBufferValue(byte[] arrayValue)
+ {
+ if (arrayValue == null)
+ return null;
+ if (arrayValue == ByteArrayUtil.UNSET_BYTE_ARRAY)
+ return ByteBufferUtil.UNSET_BYTE_BUFFER; // it is important for
correctness to preserve unset value
+ if (arrayValue == ByteArrayUtil.EMPTY_BYTE_ARRAY)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ return ByteBuffer.wrap(arrayValue);
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 22ee46c68c..cc30e7c4b7 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -87,7 +87,7 @@ public class NativeAllocator extends MemtableAllocator
implements AddressBasedAl
@Override
public void addCell(Cell<?> cell)
{
- super.addCell(new NativeCell(allocator, writeOp, cell));
+ super.addCell(NativeCell.build(allocator, writeOp, cell));
}
}
@@ -153,7 +153,7 @@ public class NativeAllocator extends MemtableAllocator
implements AddressBasedAl
@Override
public Cell<?> clone(Cell<?> cell)
{
- return new NativeCell(this, opGroup, cell);
+ return NativeCell.build(this, opGroup, cell);
}
@Override
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
index dd3e1a1d34..98abda9cda 100644
---
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
+++
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -18,14 +18,11 @@
package org.apache.cassandra.test.microbench;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.BatchQueryOptions;
@@ -44,6 +41,7 @@ import org.apache.cassandra.schema.SchemaTestUtil;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -64,8 +62,6 @@ import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@@ -108,14 +104,14 @@ public class BatchStatementBench
SchemaTestUtil.addOrUpdateKeyspace(ksm.withSwapped(ksm.tables.with(metadata)),
false);
List<ModificationStatement> modifications = new ArrayList<>(batchSize);
- List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize);
+ List<byte[][]> parameters = new ArrayList<>(batchSize);
List<Object> queryOrIdList = new ArrayList<>(batchSize);
QueryHandler.Prepared prepared =
QueryProcessor.prepareInternal(String.format("INSERT INTO %s.%s (id, ck, v)
VALUES (?,?,?)", keyspace, table));
for (int i = 0; i < batchSize; i++)
{
modifications.add((ModificationStatement) prepared.statement);
- parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1),
bytes(i), bytes(i)));
+ parameters.add(new byte[][] {ByteArrayUtil.bytes(uniquePartition ?
i : 1), ByteArrayUtil.bytes(i), ByteArrayUtil.bytes(i)});
queryOrIdList.add(prepared.rawCQLStatement);
}
bs = new BatchStatement(BatchStatement.Type.UNLOGGED,
VariableSpecifications.empty(), modifications, Attributes.none());
diff --git a/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
b/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
index dd1cb45760..1b97d1eb71 100644
--- a/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/QueryEventsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.cql3;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -39,8 +38,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -144,9 +144,9 @@ public class QueryEventsTest extends CQLTester
assertEquals(3, listener.queries.size());
assertEquals(BatchStatement.Type.UNLOGGED, listener.batchType);
assertEquals(newArrayList(q1, q1, q2), listener.queries);
- assertEquals(newArrayList(newArrayList(ByteBufferUtil.bytes(1),
ByteBufferUtil.bytes(1)),
- newArrayList(ByteBufferUtil.bytes(2),
ByteBufferUtil.bytes(2)),
- newArrayList(newArrayList())),
listener.values);
+ assertListOfArraysEquals(newArrayList(new byte[][]
{ByteArrayUtil.bytes(1), ByteArrayUtil.bytes(1)},
+ new byte[][]
{ByteArrayUtil.bytes(2), ByteArrayUtil.bytes(2)},
+ new byte[][] {}),
listener.values);
batch.add(new SimpleStatement("insert into abc.def (id, v) values
(1,1)"));
try
@@ -162,10 +162,17 @@ public class QueryEventsTest extends CQLTester
assertEquals(3, listener.queries.size());
assertEquals(BatchStatement.Type.UNLOGGED, listener.batchType);
assertEquals(newArrayList(q1, q1, q2), listener.queries);
- assertEquals(newArrayList(newArrayList(ByteBufferUtil.bytes(1),
ByteBufferUtil.bytes(1)),
- newArrayList(ByteBufferUtil.bytes(2),
ByteBufferUtil.bytes(2)),
- newArrayList(newArrayList()),
- newArrayList(newArrayList())),
listener.values);
+ assertListOfArraysEquals(newArrayList(new byte[][] {
ByteArrayUtil.bytes(1), ByteArrayUtil.bytes(1)},
+ new byte[][]
{ByteArrayUtil.bytes(2), ByteArrayUtil.bytes(2)},
+ new byte[][] {},
+ new byte[][] {}),
listener.values);
+ }
+
+ private static void assertListOfArraysEquals(List<byte[][]> expected,
List<byte[][]> actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++)
+ assertArrayEquals(expected.get(i), actual.get(i));
}
@Test
@@ -323,7 +330,7 @@ public class QueryEventsTest extends CQLTester
private static class BatchMockListener extends MockListener
{
private List<String> queries;
- private List<List<ByteBuffer>> values;
+ private List<byte[][]> values;
private BatchStatement.Type batchType;
BatchMockListener(ColumnFamilyStore currentColumnFamilyStore)
@@ -331,7 +338,7 @@ public class QueryEventsTest extends CQLTester
super(currentColumnFamilyStore);
}
- public void batchSuccess(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values,
QueryOptions options, QueryState state, long queryTime, Message.Response
response)
+ public void batchSuccess(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<byte[][]> values,
QueryOptions options, QueryState state, long queryTime, Message.Response
response)
{
inc("batchSuccess");
this.queries = queries;
@@ -340,7 +347,7 @@ public class QueryEventsTest extends CQLTester
this.queryTime = queryTime;
}
- public void batchFailure(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<List<ByteBuffer>> values,
QueryOptions options, QueryState state, Exception cause)
+ public void batchFailure(BatchStatement.Type batchType, List<? extends
CQLStatement> statements, List<String> queries, List<byte[][]> values,
QueryOptions options, QueryState state, Exception cause)
{
inc("batchFailure");
this.queries = queries;
diff --git a/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
b/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
index e95eb0c8b4..ec3ac0df91 100644
--- a/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
+++ b/test/unit/org/apache/cassandra/fql/FullQueryLoggerTest.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.binlog.BinLogTest;
@@ -462,9 +463,9 @@ public class FullQueryLoggerTest extends CQLTester
configureFQL();
logBatch(Type.UNLOGGED,
Arrays.asList("foo1", "foo2"),
- Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
- ByteBuffer.allocateDirect(2)),
- Collections.emptyList()),
+ Arrays.asList(new byte[][] {new byte[1],
+ new byte[2]},
+ ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
QueryOptions.DEFAULT,
queryState("abcdefgh"),
1);
@@ -486,9 +487,9 @@ public class FullQueryLoggerTest extends CQLTester
configureFQL();
logBatch(Type.UNLOGGED,
Arrays.asList("foo1", "foo2"),
- Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
- ByteBuffer.allocateDirect(2)),
- Collections.emptyList()),
+ Arrays.asList(new byte[][] {new byte[1],
+ new byte[2]},
+ ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
QueryOptions.DEFAULT,
queryState(),
1);
@@ -599,12 +600,10 @@ public class FullQueryLoggerTest extends CQLTester
bigList = null;
//The size of the list of values should be reflected
- List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+ List<byte[][]> bigValues = new ArrayList<>(100000);
for (int ii = 0; ii < 100000; ii++)
- {
- bigValues.add(new ArrayList<>(0));
- }
- bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+ bigValues.add(ii == 0 ? new byte[][] {new byte[1024 * 1024 * 5]} :
new byte[][]{});
+
batch = new Batch(Type.UNLOGGED, new ArrayList<>(), bigValues,
QueryOptions.DEFAULT, queryState(), 1);
assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
@@ -642,7 +641,7 @@ public class FullQueryLoggerTest extends CQLTester
@Test(expected = NullPointerException.class)
public void testLogBatchNullValuesValue() throws Exception
{
- logBatch(Type.UNLOGGED, new ArrayList<>(),
Arrays.asList((List<ByteBuffer>)null), null, queryState(), 1);
+ logBatch(Type.UNLOGGED, new ArrayList<>(),
Collections.singletonList(null), null, queryState(), 1);
}
@Test(expected = NullPointerException.class)
@@ -737,7 +736,7 @@ public class FullQueryLoggerTest extends CQLTester
private void logBatch(BatchStatement.Type type,
List<String> queries,
- List<List<ByteBuffer>> values,
+ List<byte[][]> values,
QueryOptions options,
QueryState queryState,
long time)
diff --git
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
index b94ed8db48..11ec07ba99 100644
---
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
+++
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.metrics;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -37,6 +36,7 @@ import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.transport.messages.BatchMessage;
import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.utils.ByteArrayUtil;
import static
org.apache.cassandra.metrics.CassandraMetricsRegistry.METRIC_SCOPE_UNDEFINED;
import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
@@ -261,7 +261,7 @@ public class ClientRequestRowAndColumnMetricsTest extends
CQLTester
String first = String.format("INSERT INTO %s.%s (pk, v1, v2)
VALUES (1, 10, 100)", KEYSPACE, currentTable());
String second = String.format("INSERT INTO %s.%s (pk, v1, v2)
VALUES (2, 20, 200)", KEYSPACE, currentTable());
- List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ List<byte[][]> values =
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
client.execute(batch);
@@ -316,7 +316,7 @@ public class ClientRequestRowAndColumnMetricsTest extends
CQLTester
String first = String.format("INSERT INTO %s.%s (pk, ck, v0, v1,
v2) VALUES (0, 1, 2, 3, 4)", KEYSPACE, currentTable());
String second = String.format("INSERT INTO %s.%s (pk, ck, v0, v1,
v2) VALUES (0, 2, 3, 5, 6)", KEYSPACE, currentTable());
- List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ List<byte[][]> values =
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
client.execute(batch);
@@ -377,7 +377,7 @@ public class ClientRequestRowAndColumnMetricsTest extends
CQLTester
String first = String.format("INSERT INTO %s.%s (pk, ck, v1, v2)
VALUES (1, 2, 3, 4)", KEYSPACE, currentTable());
String second = String.format("DELETE FROM %s.%s WHERE pk = 1 AND
ck > 1", KEYSPACE, currentTable());
- List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ List<byte[][]> values =
ImmutableList.of(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS,
ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
client.execute(batch);
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 42c6cbd021..e7889fca0d 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.transport.messages.ExecuteMessage;
import org.apache.cassandra.transport.messages.PrepareMessage;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.ReflectionUtils;
@@ -172,7 +173,7 @@ public class MessagePayloadTest extends CQLTester
BatchMessage batchMessage = new
BatchMessage(BatchStatement.Type.UNLOGGED,
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v)
VALUES (1, 'foo')"),
-
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
queryOptions);
reqMap = Collections.singletonMap("foo", bytes(45));
responsePayload = respMap = Collections.singletonMap("bar",
bytes(45));
@@ -241,7 +242,7 @@ public class MessagePayloadTest extends CQLTester
BatchMessage batchMessage = new
BatchMessage(BatchStatement.Type.UNLOGGED,
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v)
VALUES (1, 'foo')"),
-
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
QueryOptions.DEFAULT);
reqMap = Collections.singletonMap("foo", bytes(45));
responsePayload = respMap = Collections.singletonMap("bar",
bytes(45));
@@ -342,7 +343,7 @@ public class MessagePayloadTest extends CQLTester
BatchMessage batchMessage = new
BatchMessage(BatchStatement.Type.UNLOGGED,
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v)
VALUES (1, 'foo')"),
-
Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+
Collections.singletonList(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS),
QueryOptions.DEFAULT);
reqMap = Collections.singletonMap("foo", bytes(45));
responsePayload = Collections.singletonMap("bar", bytes(45));
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
index 7358ad876e..8a8ca317a0 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java
@@ -244,11 +244,15 @@ public abstract class FQLQuery implements
Comparable<FQLQuery>
public BinLog.ReleaseableWriteMarshallable toMarshallable()
{
List<String> queryStrings = new ArrayList<>();
- List<List<ByteBuffer>> values = new ArrayList<>();
+ List<byte[][]> values = new ArrayList<>();
for (Single q : queries)
{
queryStrings.add(q.query);
- values.add(q.values);
+ byte[][] valuesAsArray = new byte[q.values.size()][];
+ int i = 0;
+ for (ByteBuffer value : q.values)
+ valuesAsArray[i++] = value.array();
+ values.add(valuesAsArray);
}
return new
FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()),
queryStrings, values, queryOptions, queryState, queryStartTime);
}
diff --git
a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
index 894bfc40da..ef8863a683 100644
--- a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
+++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLReplayTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.fqltool.commands.Replay;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.tools.Util;
+import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.Pair;
@@ -705,11 +706,11 @@ public class FQLReplayTest
{
int batchSize = random ? r.nextInt(99) + 1 : i + 1;
List<String> queries = new ArrayList<>(batchSize);
- List<List<ByteBuffer>> values = new ArrayList<>(batchSize);
+ List<byte[][]> values = new ArrayList<>(batchSize);
for (int jj = 0; jj < (random ? r.nextInt(batchSize) :
10); jj++)
{
queries.add("aaaaaa batch "+i+":"+jj);
- values.add(Collections.emptyList());
+ values.add(ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS);
}
FullQueryLogger.Batch batch = new
FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED,
queries,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]