Updated Branches: refs/heads/trunk fc01759f4 -> 7ce5e062e
Correctly handle null in conditions with TTL patch by slebresne; reviewed by iamaleksey for CASSANDRA-6623 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e59ef16b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e59ef16b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e59ef16b Branch: refs/heads/trunk Commit: e59ef16bfcb3bd019202fc12bedeb04302066540 Parents: 58e9481 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Feb 6 08:36:12 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Feb 6 08:36:12 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/ModificationStatement.java | 109 +++++++++++++++---- .../apache/cassandra/service/CASConditions.java | 38 +++++++ .../apache/cassandra/service/StorageProxy.java | 75 ++----------- .../cassandra/thrift/CassandraServer.java | 64 ++++++++++- 5 files changed, 197 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bba5f20..7ba8044 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.6 + * Correctly handle null with IF conditions and TTL (CASSANDRA-6623) Merged from 1.2: * Fix partition and range deletes not triggering flush (CASSANDRA-6655) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index c0bf428..2567043 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -27,19 +27,20 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.BooleanType; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.service.CASConditions; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.ByteBufferUtil; /* * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE. @@ -415,16 +416,17 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), getTimeToLive(variables), null); ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams); - // When building the conditions, we should not use the TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible - // for it to expire before actually build the conditions which would break since we would then test for the presence of tombstones. - UpdateParameters condParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), 0, null); - ColumnFamily expected = buildConditions(key, clusteringPrefix, condParams); + // It's cleaner to use the query timestamp below, but it's in seconds while the conditions expects microseconds, so just + // put it back in millis (we don't really lose precision because the ultimate consumer, Column.isLive, re-divide it). + long now = queryState.getTimestamp() * 1000; + CASConditions conditions = ifNotExists + ? new NotExistCondition(clusteringPrefix, now) + : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions, variables, now); ColumnFamily result = StorageProxy.cas(keyspace(), columnFamily(), key, - clusteringPrefix, - expected, + conditions, updates, options.getSerialConsistency(), options.getConsistency()); @@ -542,28 +544,91 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF return isCounter() ? new CounterMutation(rm, cl) : rm; } - private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params) - throws InvalidRequestException + private static abstract class CQL3CasConditions implements CASConditions { - if (ifNotExists) - return null; + protected final ColumnNameBuilder rowPrefix; + protected final long now; - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm); + protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now) + { + this.rowPrefix = rowPrefix; + this.now = now; + } - // CQL row marker - CFDefinition cfDef = cfm.getCfDef(); - if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper()) + public IDiskAtomFilter readFilter() { - ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build(); - cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER)); + // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists + // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the + // row marker for that (see #6623) + return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount()); } + } - // Conditions - for (Operation condition : columnConditions) - condition.execute(key, cf, clusteringPrefix.copy(), params); + private static class NotExistCondition extends CQL3CasConditions + { + private NotExistCondition(ColumnNameBuilder rowPrefix, long now) + { + super(rowPrefix, now); + } + + public boolean appliesTo(ColumnFamily current) + { + return current == null || current.hasOnlyTombstones(now); + } + } + + private static class ColumnsConditions extends CQL3CasConditions + { + private final ColumnFamily expected; + + private ColumnsConditions(ColumnNameBuilder rowPrefix, + CFMetaData cfm, + ByteBuffer key, + Collection<Operation> conditions, + List<ByteBuffer> variables, + long now) throws InvalidRequestException + { + super(rowPrefix, now); + this.expected = TreeMapBackedSortedColumns.factory.create(cfm); - assert !cf.isEmpty(); - return cf; + // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible + // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones. + UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null); + + // Conditions + for (Operation condition : conditions) + condition.execute(key, expected, rowPrefix.copy(), params); + } + + public boolean appliesTo(ColumnFamily current) + { + if (current == null) + return false; + + for (Column e : expected) + { + Column c = current.getColumn(e.name()); + if (e.isLive(now)) + { + if (c == null || !c.isLive(now) || !c.value().equals(e.value())) + return false; + } + else + { + // If we have a tombstone in expected, it means the condition tests that the column is + // null, so check that we have no value + if (c != null && c.isLive(now)) + return false; + } + } + return true; + } + + @Override + public String toString() + { + return expected.toString(); + } } public static abstract class Parsed extends CFStatement http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/CASConditions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java new file mode 100644 index 0000000..d4b3e19 --- /dev/null +++ b/src/java/org/apache/cassandra/service/CASConditions.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.filter.IDiskAtomFilter; + +/** + * Abstract the conditions to be fulfilled by a CAS operation. + */ +public interface CASConditions +{ + /** + * The filter to use to fetch the value to compare for the CAS. + */ + public IDiskAtomFilter readFilter(); + + /** + * Returns whether the provided CF, that represents the values fetched using the + * readFilter(), match the CAS conditions this object stands for. + */ + public boolean appliesTo(ColumnFamily current); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 5671655..8d1f913 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -41,11 +41,8 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; @@ -157,7 +154,7 @@ public class StorageProxy implements StorageProxyMBean /** * Apply @param updates if and only if the current values in the row for @param key - * match the ones given by @param expected. The algorithm is "raw" Paxos: that is, Paxos + * match the provided @param conditions. The algorithm is "raw" Paxos: that is, Paxos * minus leader election -- any node in the cluster may propose changes for any row, * which (that is, the row) is the unit of values being proposed, not single columns. * @@ -189,23 +186,18 @@ public class StorageProxy implements StorageProxyMBean * @param keyspaceName the keyspace for the CAS * @param cfName the column family for the CAS * @param key the row key for the row to CAS - * @param prefix a column name prefix that selects the CQL3 row to check if {@code expected} is null. If {@code expected} - * is not null, this is ignored. If {@code expected} is null and this is null, the full row existing is checked (by querying - * the first live column of the row). - * @param expected the expected column values. This can be null to check for existence (see {@code prefix}). - * @param updates the value to insert if {@code expected matches the current values}. + * @param conditions the conditions for the CAS to apply. + * @param updates the value to insert if {@code condtions} matches the current values. * @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL. * @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL. * - * @return null if the operation succeeds in updating the row, or the current values for the columns contained in - * expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If - * expected == null and the CAS is unsuccessfull, the first live column of the CF is returned. + * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions. + * (since, if the CAS doesn't succeed, it means the current value do not match the conditions). */ public static ColumnFamily cas(String keyspaceName, String cfName, ByteBuffer key, - ColumnNameBuilder prefix, - ColumnFamily expected, + CASConditions conditions, ColumnFamily updates, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit) @@ -227,27 +219,15 @@ public class StorageProxy implements StorageProxyMBean UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos); - // read the current value and compare with expected + // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); long timestamp = System.currentTimeMillis(); - ReadCommand readCommand; - if (expected == null || expected.isEmpty()) - { - SliceQueryFilter filter = prefix == null - ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1) - : new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(), false, 1, prefix.componentCount()); - readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter); - } - else - { - assert !expected.isEmpty(); - readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(metadata.comparator, expected.getColumnNames()))); - } + ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter()); List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); ColumnFamily current = rows.get(0).cf; - if (!casApplies(expected, current)) + if (!conditions.appliesTo(current)) { - Tracing.trace("CAS precondition {} does not match current values {}", expected, current); + Tracing.trace("CAS precondition {} does not match current values {}", conditions, current); // We should not return null as this means success return current == null ? EmptyColumns.factory.create(metadata) : current; } @@ -274,41 +254,6 @@ public class StorageProxy implements StorageProxyMBean throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } - private static boolean hasLiveColumns(ColumnFamily cf, long now) - { - return cf != null && !cf.hasOnlyTombstones(now); - } - - private static boolean casApplies(ColumnFamily expected, ColumnFamily current) - { - long now = System.currentTimeMillis(); - - if (!hasLiveColumns(expected, now)) - return !hasLiveColumns(current, now); - else if (!hasLiveColumns(current, now)) - return false; - - // current has been built from expected, so we know that it can't have columns - // that excepted don't have. So we just check that for each columns in expected: - // - if it is a tombstone, whether current has no column or a tombstone; - // - otherwise, that current has a live column with the same value. - for (Column e : expected) - { - Column c = current.getColumn(e.name()); - if (e.isLive(now)) - { - if (!(c != null && c.isLive(now) && c.value().equals(e.value()))) - return false; - } - else - { - if (c != null && c.isLive(now)) - return false; - } - } - return true; - } - private static Predicate<InetAddress> sameDCPredicateFor(final String dc) { final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index beaae78..ef5eeb8 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -30,6 +30,7 @@ import java.util.zip.Inflater; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -59,6 +60,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.service.CASConditions; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageProxy; @@ -768,8 +770,7 @@ public class CassandraServer implements Cassandra.Iface ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), column_family, key, - null, - cfExpected, + new ThriftCASConditions(cfExpected), cfUpdates, ThriftConversion.fromThrift(serial_consistency_level), ThriftConversion.fromThrift(commit_consistency_level)); @@ -2158,5 +2159,62 @@ public class CassandraServer implements Cassandra.Iface } }); } - // main method moved to CassandraDaemon + + private static class ThriftCASConditions implements CASConditions + { + private final ColumnFamily expected; + + private ThriftCASConditions(ColumnFamily expected) + { + this.expected = expected; + } + + public IDiskAtomFilter readFilter() + { + return expected == null || expected.isEmpty() + ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1) + : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getComparator(), expected.getColumnNames())); + } + + public boolean appliesTo(ColumnFamily current) + { + long now = System.currentTimeMillis(); + + if (!hasLiveColumns(expected, now)) + return !hasLiveColumns(current, now); + else if (!hasLiveColumns(current, now)) + return false; + + // current has been built from expected, so we know that it can't have columns + // that excepted don't have. So we just check that for each columns in expected: + // - if it is a tombstone, whether current has no column or a tombstone; + // - otherwise, that current has a live column with the same value. + for (org.apache.cassandra.db.Column e : expected) + { + org.apache.cassandra.db.Column c = current.getColumn(e.name()); + if (e.isLive(now)) + { + if (c == null || !c.isLive(now) || !c.value().equals(e.value())) + return false; + } + else + { + if (c != null && c.isLive(now)) + return false; + } + } + return true; + } + + private static boolean hasLiveColumns(ColumnFamily cf, long now) + { + return cf != null && !cf.hasOnlyTombstones(now); + } + + @Override + public String toString() + { + return expected.toString(); + } + } }