Ensure consistent view of partition columns between coordinator and replica in ColumnFilter
Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-13004 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f54aa42 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f54aa42 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f54aa42 Branch: refs/heads/cassandra-3.11 Commit: 1f54aa424fd8a79089f76951a93560e6bca9d459 Parents: 7b9868c Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Wed May 31 17:01:14 2017 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Thu Jun 15 19:03:58 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 23 +++++ .../org/apache/cassandra/db/ReadResponse.java | 9 +- .../db/commitlog/CommitLogDescriptor.java | 2 +- .../cassandra/db/filter/ColumnFilter.java | 102 ++++++++++++++----- .../apache/cassandra/net/MessagingService.java | 7 +- .../cassandra/service/MigrationManager.java | 13 ++- .../cassandra/db/filter/ColumnFilterTest.java | 70 +++++++++++++ 8 files changed, 192 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26462db..528bbcd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.14 + * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004) * Failed unregistering mbean during drop keyspace (CASSANDRA-13346) * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542) * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 6790e6b..00ec48d 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,29 @@ using the provided 'sstableupgrade' tool. Upgrading --------- + - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might + result into data corruption (see CASSANDRA-13004 for more details). + Fixing this bug required a messaging protocol version bump. By default, + Cassandra 3.0.14 will use 3014 version for messaging. + + Since Schema Migrations rely the on exact messaging protocol version + match between nodes, if you need schema changes during the upgrade + process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true` + first, in order to temporarily force a backwards compatible protocol. + After the whole cluster is upgraded to 3.0.14, do a rolling + restart of the cluster without setting that flag. + + 3.0.14 nodes with and withouot the flag set will be able to do schema + migrations with other 3.x and 3.0.x releases. + + While running the cluster with the flag set to true on 3.0.14 (in + compatibility mode), avoid adding or removing any columns to/from + existing tables. + + If your cluster can do without schema migrations during the upgrade + time, just start the cluster normally without setting aforementioned + flag. + - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary but otherwise harmless schema updates, see CASSANDRA-13559 for more details. http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 12f0b15..693b52b 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -378,7 +378,7 @@ public abstract class ReadResponse if (digest.hasRemaining()) return new DigestResponse(digest); - assert version == MessagingService.VERSION_30; + assert version >= MessagingService.VERSION_30; ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); return new RemoteDataResponse(data); } @@ -413,9 +413,10 @@ public abstract class ReadResponse long size = ByteBufferUtil.serializedSizeWithVIntLength(digest); if (!isDigest) { - // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the - // version, we'll have to deserialize/re-serialize the data to be in the proper version. - assert version == MessagingService.VERSION_30; + // In theory, we should deserialize/re-serialize if the version asked is different from the current + // version as the content could have a different serialization format. So far though, we haven't made + // change to partition iterators serialization since 3.0 so we skip this. + assert version >= MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; size += ByteBufferUtil.serializedSizeWithVIntLength(data); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 6774d39..0df20ce 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -198,7 +198,7 @@ public class CommitLogDescriptor case VERSION_22: return MessagingService.VERSION_22; case VERSION_30: - return MessagingService.VERSION_30; + return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014; default: throw new IllegalStateException("Unknown commitlog version " + version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index df91781..c28c0ae 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; /** * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected @@ -52,23 +53,27 @@ public class ColumnFilter public static final Serializer serializer = new Serializer(); // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved - // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped. - // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all. + // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped. + // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all. private final boolean isFetchAll; - private final CFMetaData metadata; // can be null if !isFetchAll - - private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value + private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value + private final PartitionColumns fetched; private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null + /** + * Used on replica for deserialisation + */ private ColumnFilter(boolean isFetchAll, - CFMetaData metadata, - PartitionColumns columns, + PartitionColumns fetched, + PartitionColumns queried, SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections) { + assert !isFetchAll || fetched != null; + assert isFetchAll || queried != null; this.isFetchAll = isFetchAll; - this.metadata = metadata; - this.selection = columns; + this.fetched = isFetchAll ? fetched : queried; + this.queried = queried; this.subSelections = subSelections; } @@ -77,7 +82,7 @@ public class ColumnFilter */ public static ColumnFilter all(CFMetaData metadata) { - return new ColumnFilter(true, metadata, null, null); + return new ColumnFilter(true, metadata.partitionColumns(), null, null); } /** @@ -98,7 +103,7 @@ public class ColumnFilter */ public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried) { - return new ColumnFilter(true, metadata, queried, null); + return new ColumnFilter(true, metadata.partitionColumns(), queried, null); } /** @@ -111,7 +116,7 @@ public class ColumnFilter */ public PartitionColumns fetchedColumns() { - return isFetchAll ? metadata.partitionColumns() : selection; + return fetched; } public boolean includesAllColumns() @@ -124,7 +129,7 @@ public class ColumnFilter */ public boolean includes(ColumnDefinition column) { - return isFetchAll || selection.contains(column); + return isFetchAll || queried.contains(column); } /** @@ -301,7 +306,7 @@ public class ColumnFilter boolean isFetchAll = metadata != null; PartitionColumns selectedColumns = selection == null ? null : selection.build(); - // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder + // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder // with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471). if (!isFetchAll && selectedColumns == null) selectedColumns = PartitionColumns.NONE; @@ -314,20 +319,37 @@ public class ColumnFilter s.put(subSelection.column().name, subSelection); } - return new ColumnFilter(isFetchAll, metadata, selectedColumns, s); + return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s); } } @Override + public boolean equals(Object other) + { + if (other == this) + return true; + + if (!(other instanceof ColumnFilter)) + return false; + + ColumnFilter otherCf = (ColumnFilter) other; + + return otherCf.isFetchAll == this.isFetchAll && + Objects.equals(otherCf.fetched, this.fetched) && + Objects.equals(otherCf.queried, this.queried) && + Objects.equals(otherCf.subSelections, this.subSelections); + + } + @Override public String toString() { if (isFetchAll) return "*"; - if (selection.isEmpty()) + if (queried.isEmpty()) return ""; - Iterator<ColumnDefinition> defs = selection.selectOrderIterator(); + Iterator<ColumnDefinition> defs = queried.selectOrderIterator(); if (!defs.hasNext()) return "<none>"; @@ -367,7 +389,7 @@ public class ColumnFilter private static int makeHeaderByte(ColumnFilter selection) { return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) - | (selection.selection != null ? HAS_SELECTION_MASK : 0) + | (selection.queried != null ? HAS_SELECTION_MASK : 0) | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); } @@ -375,10 +397,16 @@ public class ColumnFilter { out.writeByte(makeHeaderByte(selection)); - if (selection.selection != null) + if (version >= MessagingService.VERSION_3014 && selection.isFetchAll) + { + Columns.serializer.serialize(selection.fetched.statics, out); + Columns.serializer.serialize(selection.fetched.regulars, out); + } + + if (selection.queried != null) { - Columns.serializer.serialize(selection.selection.statics, out); - Columns.serializer.serialize(selection.selection.regulars, out); + Columns.serializer.serialize(selection.queried.statics, out); + Columns.serializer.serialize(selection.queried.regulars, out); } if (selection.subSelections != null) @@ -396,7 +424,23 @@ public class ColumnFilter boolean hasSelection = (header & HAS_SELECTION_MASK) != 0; boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0; + PartitionColumns fetched = null; PartitionColumns selection = null; + + if (isFetchAll) + { + if (version >= MessagingService.VERSION_3014) + { + Columns statics = Columns.serializer.deserialize(in, metadata); + Columns regulars = Columns.serializer.deserialize(in, metadata); + fetched = new PartitionColumns(statics, regulars); + } + else + { + fetched = metadata.partitionColumns(); + } + } + if (hasSelection) { Columns statics = Columns.serializer.deserialize(in, metadata); @@ -416,17 +460,23 @@ public class ColumnFilter } } - return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections); + return new ColumnFilter(isFetchAll, fetched, selection, subSelections); } public long serializedSize(ColumnFilter selection, int version) { long size = 1; // header byte - if (selection.selection != null) + if (version >= MessagingService.VERSION_3014 && selection.isFetchAll) + { + size += Columns.serializer.serializedSize(selection.fetched.statics); + size += Columns.serializer.serializedSize(selection.fetched.regulars); + } + + if (selection.queried != null) { - size += Columns.serializer.serializedSize(selection.selection.statics); - size += Columns.serializer.serializedSize(selection.selection.regulars); + size += Columns.serializer.serializedSize(selection.queried.statics); + size += Columns.serializer.serializedSize(selection.queried.regulars); } if (selection.subSelections != null) @@ -440,4 +490,4 @@ public class ColumnFilter return size; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 4aaf49b..e0f77b7 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -80,6 +80,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition; public final class MessagingService implements MessagingServiceMBean { + // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14. + // See CASSANDRA-13004 for details. + public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version"); + public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions @@ -88,7 +92,8 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_21 = 8; public static final int VERSION_22 = 9; public static final int VERSION_30 = 10; - public static final int current_version = VERSION_30; + public static final int VERSION_3014 = 11; + public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014; public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index aacb769..7b7cd8f 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -144,10 +144,17 @@ public class MigrationManager * Don't request schema from fat clients */ return MessagingService.instance().knowsVersion(endpoint) - && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version + && is30Compatible(MessagingService.instance().getRawVersion(endpoint)) && !Gossiper.instance.isGossipOnlyMember(endpoint); } + // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes + // from both 3.0 and 3.0.14. + private static boolean is30Compatible(int version) + { + return version == MessagingService.current_version || version == MessagingService.VERSION_3014; + } + public static boolean isReadyForBootstrap() { return MigrationTask.getInflightTasks().isEmpty(); @@ -541,8 +548,8 @@ public class MigrationManager { // only push schema to nodes with known and equal versions if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && - MessagingService.instance().knowsVersion(endpoint) && - MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) + MessagingService.instance().knowsVersion(endpoint) && + is30Compatible(MessagingService.instance().getRawVersion(endpoint))) pushSchemaMutation(endpoint, schema); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java new file mode 100644 index 0000000..aa56091 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.filter; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ColumnFilterTest +{ + final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer(); + + @Test + public void columnFilterSerialisationRoundTrip() throws Exception + { + CFMetaData metadata = CFMetaData.Builder.create("ks", "table") + .withPartitioner(Murmur3Partitioner.instance) + .addPartitionKey("pk", Int32Type.instance) + .addClusteringColumn("ck", Int32Type.instance) + .addRegularColumn("v1", Int32Type.instance) + .addRegularColumn("v2", Int32Type.instance) + .addRegularColumn("v3", Int32Type.instance) + .build(); + + ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1")); + + testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30); + testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014); + + testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30); + testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014); + + testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30); + testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014); + } + + static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception + { + DataOutputBuffer output = new DataOutputBuffer(); + serializer.serialize(columnFilter, output, version); + Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position()); + DataInputPlus input = new DataInputBuffer(output.buffer(), false); + Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org