aweisberg commented on code in PR #4692:
URL: https://github.com/apache/cassandra/pull/4692#discussion_r3060424736


##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -667,6 +682,12 @@ public void execute(LifecycleTransaction input)
             {
                 scrubOne(cfs, input, options, active);
             }
+
+            @Override
+            public boolean incompleteOperation()

Review Comment:
   Use a default method to avoid having to add this everywhere



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -1175,6 +1175,12 @@ public void awaitDone(TableId id, long epoch)
         getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + 
epoch + ')', ExclusiveSyncPoint, TxnId.minForEpoch(epoch), ranges, Self, All, 
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges, 
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
     }
 
+    @Override
+    public List<Ranges> getInUseRanges()

Review Comment:
   getInUseRanges treats the ranges Accord manages as being global across 
tables, but it's actually a per table property. This should be a map from 
TableId to a collection of ranges.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -789,12 +822,22 @@ public AllSSTableOpStatus performCleanup(final 
ColumnFamilyStore cfStore, int jo
         if (partitioner.getClass() == LocalPartitioner.class)
             localWrites = RangesAtEndpoint.of(Replica.fullReplica(local, new 
Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken())));
 
-        final Set<Range<Token>> allRanges = new 
HashSet<>(localWrites.ranges());
-        final Set<Range<Token>> transientRanges = new 
HashSet<>(localWrites.onlyTransient().ranges());
-        final Set<Range<Token>> fullRanges = new 
HashSet<>(localWrites.onlyFull().ranges());
+        Set<Range<Token>> rangesInUseByAccord = new HashSet<>();
+        if (AccordService.isSetup())
+        {
+            List<Ranges> inUseRanges = 
AccordService.instance().getInUseRanges();
+            for (Ranges ranges : inUseRanges)
+                ranges.stream().forEach(r -> 
rangesInUseByAccord.add(((TokenRange) r).toKeyspaceRange()));
+        }
+
+        final Set<Range<Token>> allRanges = 
Stream.concat(localWrites.ranges().stream(), 
rangesInUseByAccord.stream()).collect(Collectors.toSet());
+        final Set<Range<Token>> transientRanges = 
Stream.concat(localWrites.onlyTransient().ranges().stream(), 
rangesInUseByAccord.stream()).collect(Collectors.toSet());

Review Comment:
   Accord doesn't support transient replication so it should really be a 
checkState erroring out if we are using Accord and there are transient ranges. 
Either way for transient ranges Accord may actually be using them even when 
there is cleanup work to do to remove them. The check would be more complicated 
because we would need Accord to report whether it is using the range 
transiently or fully and that distinction doesn't exist yet.
   
   So that is why checkState and then don't expand the transient ranges to 
include Accord owned ranges. Just leave it as is.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -634,15 +646,18 @@ public Object call() throws Exception
 
     private static interface OneSSTableOperation
     {
+
         Iterable<SSTableReader> filterSSTables(LifecycleTransaction 
transaction);
         void execute(LifecycleTransaction input) throws IOException;
+        boolean incompleteOperation();
     }
 
     public enum AllSSTableOpStatus
     {
         SUCCESSFUL(0),
         ABORTED(1),
-        UNABLE_TO_CANCEL(2);
+        UNABLE_TO_CANCEL(2),
+        INCOMPLETE(3);

Review Comment:
   Nodeprobe doesn't handle this. Cleanup should signal an error if some data 
wasn't cleaned up since what was requested didn't fully happen.



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -807,9 +850,9 @@ public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
                     SSTableReader sstable = sstableIter.next();
                     boolean needsCleanupFull = needsCleanup(sstable, 
fullRanges);
                     boolean needsCleanupTransient = !transientRanges.isEmpty() 
&& sstable.isRepaired() && needsCleanup(sstable, transientRanges);
+                    totalSSTables++;

Review Comment:
   Why move this?



##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -834,7 +878,13 @@ public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
             public void execute(LifecycleTransaction txn) throws IOException
             {
                 CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, 
allRanges, transientRanges, txn.onlyOne().isRepaired(), 
FBUtilities.nowInSeconds());
-                doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, 
hasIndexes);
+                this.incompleteOperation = doCleanupOne(cfStore, txn, 
cleanupStrategy, allRanges, hasIndexes, rangesInUseByAccord);

Review Comment:
   This overwrites whether it was complete for each sstable so the final value 
is just what happened to the last sstable. You also don't need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to