Ensure consistent view of partition columns between coordinator and replica in 
ColumnFilter

Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-13004

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f54aa42
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f54aa42
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f54aa42

Branch: refs/heads/trunk
Commit: 1f54aa424fd8a79089f76951a93560e6bca9d459
Parents: 7b9868c
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Wed May 31 17:01:14 2017 +0200
Committer: Alex Petrov <oleksandr.pet...@gmail.com>
Committed: Thu Jun 15 19:03:58 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  23 +++++
 .../org/apache/cassandra/db/ReadResponse.java   |   9 +-
 .../db/commitlog/CommitLogDescriptor.java       |   2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 102 ++++++++++++++-----
 .../apache/cassandra/net/MessagingService.java  |   7 +-
 .../cassandra/service/MigrationManager.java     |  13 ++-
 .../cassandra/db/filter/ColumnFilterTest.java   |  70 +++++++++++++
 8 files changed, 192 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26462db..528bbcd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.14
+ * Ensure consistent view of partition columns between coordinator and replica 
in ColumnFilter (CASSANDRA-13004)
  * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
  * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
  * Fix the reported number of sstable data files accessed per read 
(CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6790e6b..00ec48d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,29 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
+   - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+     result into data corruption (see CASSANDRA-13004 for more details).
+     Fixing this bug required a messaging protocol version bump. By default,
+     Cassandra 3.0.14 will use 3014 version for messaging.
+
+     Since Schema Migrations rely the on exact messaging protocol version
+     match between nodes, if you need schema changes during the upgrade
+     process, you have to start your nodes with 
`-Dcassandra.force_3_0_protocol_version=true`
+     first, in order to temporarily force a backwards compatible protocol.
+     After the whole cluster is upgraded to 3.0.14, do a rolling
+     restart of the cluster without setting that flag.
+
+     3.0.14 nodes with and withouot the flag set will be able to do schema
+     migrations with other 3.x and 3.0.x releases.
+
+     While running the cluster with the flag set to true on 3.0.14 (in
+     compatibility mode), avoid adding or removing any columns to/from
+     existing tables.
+
+     If your cluster can do without schema migrations during the upgrade
+     time, just start the cluster normally without setting aforementioned
+     flag.
+
    - If performing a rolling upgrade from 3.0.13, there will be a schema 
mismatch caused
      by a bug with the schema digest calculation in 3.0.13. This will cause 
unnecessary
      but otherwise harmless schema updates, see CASSANDRA-13559 for more 
details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index 12f0b15..693b52b 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -378,7 +378,7 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
-            assert version == MessagingService.VERSION_30;
+            assert version >= MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
             return new RemoteDataResponse(data);
         }
@@ -413,9 +413,10 @@ public abstract class ReadResponse
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
-                // Note that we can only get there if version == 3.0, which is 
the current_version. When we'll change the
-                // version, we'll have to deserialize/re-serialize the data to 
be in the proper version.
-                assert version == MessagingService.VERSION_30;
+                // In theory, we should deserialize/re-serialize if the 
version asked is different from the current
+                // version as the content could have a different serialization 
format. So far though, we haven't made
+                // change to partition iterators serialization since 3.0 so we 
skip this.
+                assert version >= MessagingService.VERSION_30;
                 ByteBuffer data = ((DataResponse)response).data;
                 size += ByteBufferUtil.serializedSizeWithVIntLength(data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..0df20ce 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -198,7 +198,7 @@ public class CommitLogDescriptor
             case VERSION_22:
                 return MessagingService.VERSION_22;
             case VERSION_30:
-                return MessagingService.VERSION_30;
+                return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? 
MessagingService.VERSION_30 : MessagingService.VERSION_3014;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + 
version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index df91781..c28c0ae 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a 
column for complex columns) are selected
@@ -52,23 +53,27 @@ public class ColumnFilter
     public static final Serializer serializer = new Serializer();
 
     // Distinguish between the 2 cases described above: if 'isFetchAll' is 
true, then all columns will be retrieved
-    // by the query, but the values for column/cells not selected by 
'selection' and 'subSelections' will be skipped.
-    // Otherwise, only the column/cells returned by 'selection' and 
'subSelections' will be returned at all.
+    // by the query, but the values for column/cells not selected by 'queried' 
and 'subSelections' will be skipped.
+    // Otherwise, only the column/cells returned by 'queried' and 
'subSelections' will be returned at all.
     private final boolean isFetchAll;
 
-    private final CFMetaData metadata; // can be null if !isFetchAll
-
-    private final PartitionColumns selection; // can be null if isFetchAll and 
we don't want to skip any value
+    private final PartitionColumns queried; // can be null if isFetchAll and 
we don't want to skip any value
+    private final PartitionColumns fetched;
     private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections; // can be null
 
+    /**
+     * Used on replica for deserialisation
+     */
     private ColumnFilter(boolean isFetchAll,
-                         CFMetaData metadata,
-                         PartitionColumns columns,
+                         PartitionColumns fetched,
+                         PartitionColumns queried,
                          SortedSetMultimap<ColumnIdentifier, 
ColumnSubselection> subSelections)
     {
+        assert !isFetchAll || fetched != null;
+        assert isFetchAll || queried != null;
         this.isFetchAll = isFetchAll;
-        this.metadata = metadata;
-        this.selection = columns;
+        this.fetched = isFetchAll ? fetched : queried;
+        this.queried = queried;
         this.subSelections = subSelections;
     }
 
@@ -77,7 +82,7 @@ public class ColumnFilter
      */
     public static ColumnFilter all(CFMetaData metadata)
     {
-        return new ColumnFilter(true, metadata, null, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), null, null);
     }
 
     /**
@@ -98,7 +103,7 @@ public class ColumnFilter
      */
     public static ColumnFilter selection(CFMetaData metadata, PartitionColumns 
queried)
     {
-        return new ColumnFilter(true, metadata, queried, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), queried, 
null);
     }
 
     /**
@@ -111,7 +116,7 @@ public class ColumnFilter
      */
     public PartitionColumns fetchedColumns()
     {
-        return isFetchAll ? metadata.partitionColumns() : selection;
+        return fetched;
     }
 
     public boolean includesAllColumns()
@@ -124,7 +129,7 @@ public class ColumnFilter
      */
     public boolean includes(ColumnDefinition column)
     {
-        return isFetchAll || selection.contains(column);
+        return isFetchAll || queried.contains(column);
     }
 
     /**
@@ -301,7 +306,7 @@ public class ColumnFilter
             boolean isFetchAll = metadata != null;
 
             PartitionColumns selectedColumns = selection == null ? null : 
selection.build();
-            // It's only ok to have selection == null in ColumnFilter if 
isFetchAll. So deal with the case of a "selection" builder
+            // It's only ok to have queried == null in ColumnFilter if 
isFetchAll. So deal with the case of a "selection" builder
             // with nothing selected (we can at least happen on some backward 
compatible queries - CASSANDRA-10471).
             if (!isFetchAll && selectedColumns == null)
                 selectedColumns = PartitionColumns.NONE;
@@ -314,20 +319,37 @@ public class ColumnFilter
                     s.put(subSelection.column().name, subSelection);
             }
 
-            return new ColumnFilter(isFetchAll, metadata, selectedColumns, s);
+            return new ColumnFilter(isFetchAll, isFetchAll ? 
metadata.partitionColumns() : selectedColumns, selectedColumns, s);
         }
     }
 
     @Override
+    public boolean equals(Object other)
+    {
+        if (other == this)
+            return true;
+
+        if (!(other instanceof ColumnFilter))
+            return false;
+
+        ColumnFilter otherCf = (ColumnFilter) other;
+
+        return otherCf.isFetchAll == this.isFetchAll &&
+               Objects.equals(otherCf.fetched, this.fetched) &&
+               Objects.equals(otherCf.queried, this.queried) &&
+               Objects.equals(otherCf.subSelections, this.subSelections);
+
+    }
+    @Override
     public String toString()
     {
         if (isFetchAll)
             return "*";
 
-        if (selection.isEmpty())
+        if (queried.isEmpty())
             return "";
 
-        Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
         if (!defs.hasNext())
             return "<none>";
 
@@ -367,7 +389,7 @@ public class ColumnFilter
         private static int makeHeaderByte(ColumnFilter selection)
         {
             return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
-                 | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+                 | (selection.queried != null ? HAS_SELECTION_MASK : 0)
                  | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK 
: 0);
         }
 
@@ -375,10 +397,16 @@ public class ColumnFilter
         {
             out.writeByte(makeHeaderByte(selection));
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && 
selection.isFetchAll)
+            {
+                Columns.serializer.serialize(selection.fetched.statics, out);
+                Columns.serializer.serialize(selection.fetched.regulars, out);
+            }
+
+            if (selection.queried != null)
             {
-                Columns.serializer.serialize(selection.selection.statics, out);
-                Columns.serializer.serialize(selection.selection.regulars, 
out);
+                Columns.serializer.serialize(selection.queried.statics, out);
+                Columns.serializer.serialize(selection.queried.regulars, out);
             }
 
             if (selection.subSelections != null)
@@ -396,7 +424,23 @@ public class ColumnFilter
             boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
             boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
 
+            PartitionColumns fetched = null;
             PartitionColumns selection = null;
+
+            if (isFetchAll)
+            {
+                if (version >= MessagingService.VERSION_3014)
+                {
+                    Columns statics = Columns.serializer.deserialize(in, 
metadata);
+                    Columns regulars = Columns.serializer.deserialize(in, 
metadata);
+                    fetched = new PartitionColumns(statics, regulars);
+                }
+                else
+                {
+                    fetched = metadata.partitionColumns();
+                }
+            }
+
             if (hasSelection)
             {
                 Columns statics = Columns.serializer.deserialize(in, metadata);
@@ -416,17 +460,23 @@ public class ColumnFilter
                 }
             }
 
-            return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, 
selection, subSelections);
+            return new ColumnFilter(isFetchAll, fetched, selection, 
subSelections);
         }
 
         public long serializedSize(ColumnFilter selection, int version)
         {
             long size = 1; // header byte
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && 
selection.isFetchAll)
+            {
+                size += 
Columns.serializer.serializedSize(selection.fetched.statics);
+                size += 
Columns.serializer.serializedSize(selection.fetched.regulars);
+            }
+
+            if (selection.queried != null)
             {
-                size += 
Columns.serializer.serializedSize(selection.selection.statics);
-                size += 
Columns.serializer.serializedSize(selection.selection.regulars);
+                size += 
Columns.serializer.serializedSize(selection.queried.statics);
+                size += 
Columns.serializer.serializedSize(selection.queried.regulars);
             }
 
             if (selection.subSelections != null)
@@ -440,4 +490,4 @@ public class ColumnFilter
             return size;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 4aaf49b..e0f77b7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -80,6 +80,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
 {
+    // Required to allow schema migrations while upgrading within the minor 
3.0.x versions to 3.0.14.
+    // See CASSANDRA-13004 for details.
+    public final static boolean FORCE_3_0_PROTOCOL_VERSION = 
Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+
     public static final String MBEAN_NAME = 
"org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
@@ -88,7 +92,8 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final int VERSION_21 = 8;
     public static final int VERSION_22 = 9;
     public static final int VERSION_30 = 10;
-    public static final int current_version = VERSION_30;
+    public static final int VERSION_3014 = 11;
+    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? 
VERSION_30 : VERSION_3014;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index aacb769..7b7cd8f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -144,10 +144,17 @@ public class MigrationManager
          * Don't request schema from fat clients
          */
         return MessagingService.instance().knowsVersion(endpoint)
-                && MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version
+                && 
is30Compatible(MessagingService.instance().getRawVersion(endpoint))
                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
     }
 
+    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is 
safe to accept schema changes
+    // from both 3.0 and 3.0.14.
+    private static boolean is30Compatible(int version)
+    {
+        return version == MessagingService.current_version || version == 
MessagingService.VERSION_3014;
+    }
+
     public static boolean isReadyForBootstrap()
     {
         return MigrationTask.getInflightTasks().isEmpty();
@@ -541,8 +548,8 @@ public class MigrationManager
         {
             // only push schema to nodes with known and equal versions
             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
+                MessagingService.instance().knowsVersion(endpoint) &&
+                
is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
                 pushSchemaMutation(endpoint, schema);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java 
b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
new file mode 100644
index 0000000..aa56091
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.filter;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnFilterTest
+{
+    final static ColumnFilter.Serializer serializer = new 
ColumnFilter.Serializer();
+
+    @Test
+    public void columnFilterSerialisationRoundTrip() throws Exception
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                
.withPartitioner(Murmur3Partitioner.instance)
+                                                .addPartitionKey("pk", 
Int32Type.instance)
+                                                .addClusteringColumn("ck", 
Int32Type.instance)
+                                                .addRegularColumn("v1", 
Int32Type.instance)
+                                                .addRegularColumn("v2", 
Int32Type.instance)
+                                                .addRegularColumn("v3", 
Int32Type.instance)
+                                                .build();
+
+        ColumnDefinition v1 = 
metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+
+        testRoundTrip(ColumnFilter.all(metadata), metadata, 
MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.all(metadata), metadata, 
MessagingService.VERSION_3014);
+
+        
testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), 
metadata, MessagingService.VERSION_30);
+        
testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), 
metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata, 
metadata.partitionColumns().without(v1)), metadata, 
MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata, 
metadata.partitionColumns().without(v1)), metadata, 
MessagingService.VERSION_3014);
+    }
+
+    static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, 
int version) throws Exception
+    {
+        DataOutputBuffer output = new DataOutputBuffer();
+        serializer.serialize(columnFilter, output, version);
+        Assert.assertEquals(serializer.serializedSize(columnFilter, version), 
output.position());
+        DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+        Assert.assertEquals(serializer.deserialize(input, version, metadata), 
columnFilter);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to