aweisberg commented on code in PR #4692:
URL: https://github.com/apache/cassandra/pull/4692#discussion_r3244384642


##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -789,12 +805,17 @@ public AllSSTableOpStatus performCleanup(final 
ColumnFamilyStore cfStore, int jo
         if (partitioner.getClass() == LocalPartitioner.class)
             localWrites = RangesAtEndpoint.of(Replica.fullReplica(local, new 
Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken())));
 
-        final Set<Range<Token>> allRanges = new 
HashSet<>(localWrites.ranges());
+        InUseRanges inUseRanges = getInUseRanges(cfStore.getTableId(), 
localWrites);
+
+        final List<Range<Token>> noLongerOwnedRangesInUseByAccord = 
inUseRanges.noLongerOwnedRangesInUseByAccord;
+        final List<Range<Token>> allRanges = inUseRanges.allRanges;
         final Set<Range<Token>> transientRanges = new 
HashSet<>(localWrites.onlyTransient().ranges());
         final Set<Range<Token>> fullRanges = new 
HashSet<>(localWrites.onlyFull().ranges());
 
         return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
         {
+            boolean incompleteOperation = false;

Review Comment:
   I made a mistake. This needs to be an atomic boolean and be CASed. The |= 
could turn an incompleteOperation that is already set to true to false.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -834,11 +865,50 @@ public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
             public void execute(LifecycleTransaction txn) throws IOException
             {
                 CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, 
allRanges, transientRanges, txn.onlyOne().isRepaired(), 
FBUtilities.nowInSeconds());
-                doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, 
hasIndexes);
+                this.incompleteOperation |= (doCleanupOne(cfStore, txn, 
cleanupStrategy, allRanges, hasIndexes, this.incompleteOperation, 
noLongerOwnedRangesInUseByAccord));
+            }
+
+            @Override
+            public boolean incompleteOperation()
+            {
+                return this.incompleteOperation;
             }
         }, jobs, OperationType.CLEANUP);
     }
 
+    public static class InUseRanges
+    {
+        public List<Range<Token>> noLongerOwnedRangesInUseByAccord;
+        public List<Range<Token>> allRanges;
+
+        public InUseRanges(List<Range<Token>> 
noLongerOwnedRangesInUseByAccord, List<Range<Token>> allRanges)
+        {
+            this.noLongerOwnedRangesInUseByAccord = 
noLongerOwnedRangesInUseByAccord;
+            this.allRanges = allRanges;
+        }
+    }
+
+    private InUseRanges getInUseRanges(TableId tableId, RangesAtEndpoint 
localWrites)
+    {
+        List<Range<Token>> noLongerOwnedRangesInUseByAccord = new 
ArrayList<>();
+        TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+
+        if (metadata != null && metadata.isAccordEnabled())

Review Comment:
   This is not migration aware. We need to keep the table when Accord is in the 
process of being enabled, when it is enabled, and when it is in the process of 
being disabled. 



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.api.TokenKey;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.RoutingKey;
+import accord.primitives.Ranges;
+
+public class AccordNodetoolCleanupTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordNodetoolCleanupTest.class);
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AccordTestBase.setupCluster(builder -> builder
+                                               .withoutVNodes()
+                                               .appendConfig(config -> config
+                                                                       
.set("accord.shard_durability_cycle", "20s")
+                                                                       
.with(Feature.GOSSIP, Feature.NETWORK)), 2);
+        SHARED_CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE + 
';');
+        SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+    }
+
+    @Test
+    public void accordNodetoolCleanupTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+
+        test("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v 
int) WITH transactional_mode='full'", cluster -> {
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 no longer owns token
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(Long.toString(token - 1000));
+            });
+
+            // Wait until Accord retires range, so it no longer has ownership 
of token
+            cluster.get(1).runOnInstance(() -> {
+                TableId tid = Schema.instance.getTableMetadata(KEYSPACE, 
tableName).id();
+                RoutingKey key = TokenKey.parse(tid, String.valueOf(token), 
Murmur3Partitioner.instance);
+
+                Util.spinUntilTrue(() -> {
+                    boolean doesNotContainsToken = true;
+                    List<Ranges> inUseRanges = 
getBlocking(AccordService.instance().node().commandStores().getInUseRanges());
+                    for (Ranges ranges : inUseRanges)
+                    {
+                        if (ranges.intersects(key))
+                            doesNotContainsToken = false;
+                    }
+                    return doesNotContainsToken;
+                });
+            });
+
+            cluster.get(1).nodetool("cleanup", KEYSPACE, tableName);
+
+            assertEquals(0, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Ensure data is cleaned up
+            assertEquals(0, cluster.get(1).executeInternal("SELECT k FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1").length);
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(originalToken);
+            });
+        });
+    }
+
+    @Test
+    public void accordNodetoolCleanupPartialSSTableTest() throws Throwable
+    {
+        String tableName = "tbl1";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+
+        test("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v 
int) WITH transactional_mode='full'", cluster -> {
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 2, 2);
+
+            SimpleQueryResult result1 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 2 LIMIT 1", ConsistencyLevel.SERIAL);
+            SimpleQueryResult result2 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token1 = (Long) result1.toObjectArrays()[0][0];
+            long token2 = (Long) result2.toObjectArrays()[0][0];
+
+            assertTrue((token2 < (token1 - 1000)) && token1 < 
Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 now only owns token2, but Accord still requires token1
+            cluster.get(1).runOnInstance(() -> {
+                
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).disableAutoCompaction();
+                StorageService.instance.move(Long.toString(token1 - 1000));
+            });
+
+            NodeToolResult result = cluster.get(1).nodetoolResult("cleanup", 
KEYSPACE, tableName);
+
+            assertTrue(result.getStdout().contains("Partially cleaned up 
SSTables for ranges that are no longer owned in keyspace " + KEYSPACE));
+
+            assertEquals(2, result.getRc());
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Ensure data is still there
+            assertEquals(1, cluster.get(1).executeInternal("SELECT k FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1").length);
+            assertEquals(1, cluster.get(1).executeInternal("SELECT k FROM " + 
qualifiedTableName + " WHERE k = 2 LIMIT 1").length);
+
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(originalToken);

Review Comment:
   This move operation to the original token should be in an @After method so 
the node is always at the original token between tests.



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+public class AccordNodetoolCleanupTest extends TestBaseImpl
+{
+    @Test
+    public void accordNodetoolCleanupTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .set("accord.shard_durability_target_splits", "1")
+                                                                               
       .set("accord.shard_durability_cycle", "20s")
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 no longer owns token
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(Long.toString(token - 1000));
+            });
+
+            // Wait until Accord retires range, so it no longer has ownership 
of token
+            try
+            {
+                Thread.sleep(20000);
+            }
+            catch (InterruptedException e)
+            {
+                fail();
+            }
+
+            cluster.get(1).nodetool("cleanup", KEYSPACE, tableName);
+
+            assertEquals(0, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+        }
+    }
+
+    @Test
+    public void accordNodetoolCleanupPartialSSTableTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .set("accord.shard_durability_target_splits", "1")
+                                                                               
       .set("accord.shard_durability_cycle", "20s")
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 2, 2);
+
+            SimpleQueryResult result1 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 2 LIMIT 1", ConsistencyLevel.SERIAL);
+            SimpleQueryResult result2 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token1 = (Long) result1.toObjectArrays()[0][0];
+            long token2 = (Long) result2.toObjectArrays()[0][0];
+
+            assertTrue((token2 < (token1 - 1000)) && token1 < 
Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 now only owns token2, but Accord still requires token1
+            cluster.get(1).runOnInstance(() -> {
+                
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).disableAutoCompaction();
+                StorageService.instance.move(Long.toString(token1 - 1000));
+            });
+
+            NodeToolResult result = cluster.get(1).nodetoolResult("cleanup", 
KEYSPACE, tableName);
+
+            assertTrue(result.getStdout().contains("Partially cleaned up 
SSTables for ranges that are no longer owned in keyspace " + KEYSPACE));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+        }
+    }
+
+    @Test
+    public void accordNodetoolCleanupRangeInUseTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            cluster.get(1).runOnInstance(() -> 
StorageService.instance.move(Long.toString(token - 1000)));
+
+            cluster.get(1).nodetoolResult("cleanup", KEYSPACE, tableName);

Review Comment:
   Also this nodetool result is not validated.



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.StorageService;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+public class AccordNodetoolCleanupTest extends TestBaseImpl
+{
+    @Test
+    public void accordNodetoolCleanupTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .set("accord.shard_durability_target_splits", "1")
+                                                                               
       .set("accord.shard_durability_cycle", "20s")
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 no longer owns token
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(Long.toString(token - 1000));
+            });
+
+            // Wait until Accord retires range, so it no longer has ownership 
of token
+            try
+            {
+                Thread.sleep(20000);
+            }
+            catch (InterruptedException e)
+            {
+                fail();
+            }
+
+            cluster.get(1).nodetool("cleanup", KEYSPACE, tableName);
+
+            assertEquals(0, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+        }
+    }
+
+    @Test
+    public void accordNodetoolCleanupPartialSSTableTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .set("accord.shard_durability_target_splits", "1")
+                                                                               
       .set("accord.shard_durability_cycle", "20s")
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 2, 2);
+
+            SimpleQueryResult result1 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 2 LIMIT 1", ConsistencyLevel.SERIAL);
+            SimpleQueryResult result2 = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token1 = (Long) result1.toObjectArrays()[0][0];
+            long token2 = (Long) result2.toObjectArrays()[0][0];
+
+            assertTrue((token2 < (token1 - 1000)) && token1 < 
Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 now only owns token2, but Accord still requires token1
+            cluster.get(1).runOnInstance(() -> {
+                
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).disableAutoCompaction();
+                StorageService.instance.move(Long.toString(token1 - 1000));
+            });
+
+            NodeToolResult result = cluster.get(1).nodetoolResult("cleanup", 
KEYSPACE, tableName);
+
+            assertTrue(result.getStdout().contains("Partially cleaned up 
SSTables for ranges that are no longer owned in keyspace " + KEYSPACE));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+        }
+    }
+
+    @Test
+    public void accordNodetoolCleanupRangeInUseTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+        try (Cluster cluster = 
init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
+                                                                               
       config
+                                                                               
       .with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+            cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k 
int PRIMARY KEY, v int) WITH transactional_mode='full'");
+
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            cluster.get(1).runOnInstance(() -> 
StorageService.instance.move(Long.toString(token - 1000)));
+
+            cluster.get(1).nodetoolResult("cleanup", KEYSPACE, tableName);

Review Comment:
   On this pass of review I am missing how this issue was resolved?



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -823,7 +846,15 @@ public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
                         transaction.cancel(sstable);
                         skippedSStables++;
                     }
+                    else if (!needsCleanupAccord)
+                    {
+                        sstableIter.remove();
+                        transaction.cancel(sstable);
+                        skippedSStables++;
+                        this.incompleteOperation = true;
+                    }
                 }
+
                 logger.info("Skipping cleanup for {}/{} sstables for {}.{} 
since they are fully contained in owned ranges (full ranges: {}, transient 
ranges: {})",

Review Comment:
   This log message now counts SSTables skipped by the Accord else if branch, 
but those are NOT in owned ranges — they're in ranges no-longer-owned but 
in-use by Accord. That scenario should be logged explicitly so operators know 
why the skipping occurred.



##########
test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.api.TokenKey;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.cassandra.service.accord.AccordService.getBlocking;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.RoutingKey;
+import accord.primitives.Ranges;
+
+public class AccordNodetoolCleanupTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordNodetoolCleanupTest.class);
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AccordTestBase.setupCluster(builder -> builder
+                                               .withoutVNodes()
+                                               .appendConfig(config -> config
+                                                                       
.set("accord.shard_durability_cycle", "20s")
+                                                                       
.with(Feature.GOSSIP, Feature.NETWORK)), 2);
+        SHARED_CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE + 
';');
+        SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+    }
+
+    @Test
+    public void accordNodetoolCleanupTest() throws Throwable
+    {
+        String tableName = "tbl0";
+        String qualifiedTableName = KEYSPACE + '.' + tableName;
+
+        test("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v 
int) WITH transactional_mode='full'", cluster -> {
+            cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + 
qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);
+
+            cluster.get(1).flush(withKeyspace("%s"));
+
+            String originalToken = cluster.get(1).callOnInstance(() -> 
getOnlyElement(StorageService.instance.getTokens()));
+
+            long token = (Long) result.toObjectArrays()[0][0];
+
+            assertTrue(token < Long.parseLong(originalToken));
+
+            assertEquals(1, (int) cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
+
+            // Cluster 1 no longer owns token
+            cluster.get(1).runOnInstance(() -> {
+                StorageService.instance.move(Long.toString(token - 1000));
+            });
+
+            // Wait until Accord retires range, so it no longer has ownership 
of token
+            cluster.get(1).runOnInstance(() -> {
+                TableId tid = Schema.instance.getTableMetadata(KEYSPACE, 
tableName).id();
+                RoutingKey key = TokenKey.parse(tid, String.valueOf(token), 
Murmur3Partitioner.instance);
+
+                Util.spinUntilTrue(() -> {

Review Comment:
   This only waits 10 seconds, but Accord's durability cycle is 20 seconds by 
default. For this test you want to drastically decrease the durability cycle so 
the test doesn't have to wait long. Like 1 second.
   
   Other tests need it set to 20 seconds so that they can encounter the case 
where durability hasn't occurred but that isn't reliable because the tests 
aren't aligned with the durability scheduler?
   
   The right thing to do here is set a very short durabiliy cycle (or use the 
default if it is already short) and then pause/unpause durability scheduling to 
construct the desired outcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to