aweisberg commented on code in PR #4692:
URL: https://github.com/apache/cassandra/pull/4692#discussion_r3060424736
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -667,6 +682,12 @@ public void execute(LifecycleTransaction input)
{
scrubOne(cfs, input, options, active);
}
+
+ @Override
+ public boolean incompleteOperation()
Review Comment:
Use a default method to avoid having to add this everywhere
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -1175,6 +1175,12 @@ public void awaitDone(TableId id, long epoch)
getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " +
epoch + ')', ExclusiveSyncPoint, TxnId.minForEpoch(epoch), ranges, Self, All,
DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges,
new LatencyRequestBookkeeping(null), startedAt, deadline, false);
}
+ @Override
+ public List<Ranges> getInUseRanges()
Review Comment:
getInUseRanges treats the ranges Accord manages as being global across
tables, but it's actually a per table property. This should be a map from
TableId to a collection of ranges.
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -789,12 +822,22 @@ public AllSSTableOpStatus performCleanup(final
ColumnFamilyStore cfStore, int jo
if (partitioner.getClass() == LocalPartitioner.class)
localWrites = RangesAtEndpoint.of(Replica.fullReplica(local, new
Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken())));
- final Set<Range<Token>> allRanges = new
HashSet<>(localWrites.ranges());
- final Set<Range<Token>> transientRanges = new
HashSet<>(localWrites.onlyTransient().ranges());
- final Set<Range<Token>> fullRanges = new
HashSet<>(localWrites.onlyFull().ranges());
+ Set<Range<Token>> rangesInUseByAccord = new HashSet<>();
+ if (AccordService.isSetup())
+ {
+ List<Ranges> inUseRanges =
AccordService.instance().getInUseRanges();
+ for (Ranges ranges : inUseRanges)
+ ranges.stream().forEach(r ->
rangesInUseByAccord.add(((TokenRange) r).toKeyspaceRange()));
+ }
+
+ final Set<Range<Token>> allRanges =
Stream.concat(localWrites.ranges().stream(),
rangesInUseByAccord.stream()).collect(Collectors.toSet());
+ final Set<Range<Token>> transientRanges =
Stream.concat(localWrites.onlyTransient().ranges().stream(),
rangesInUseByAccord.stream()).collect(Collectors.toSet());
Review Comment:
Accord doesn't support transient replication so it should really be a
checkState erroring out if we are using Accord and there are transient ranges.
Either way for transient ranges Accord may actually be using them even when
there is cleanup work to do to remove them. The check would be more complicated
because we would need Accord to report whether it is using the range
transiently or fully and that distinction doesn't exist yet.
So that is why checkState and then don't expand the transient ranges to
include Accord owned ranges. Just leave it as is.
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -634,15 +646,18 @@ public Object call() throws Exception
private static interface OneSSTableOperation
{
+
Iterable<SSTableReader> filterSSTables(LifecycleTransaction
transaction);
void execute(LifecycleTransaction input) throws IOException;
+ boolean incompleteOperation();
}
public enum AllSSTableOpStatus
{
SUCCESSFUL(0),
ABORTED(1),
- UNABLE_TO_CANCEL(2);
+ UNABLE_TO_CANCEL(2),
+ INCOMPLETE(3);
Review Comment:
Nodeprobe doesn't handle this. Cleanup should signal an error if some data
wasn't cleaned up since what was requested didn't fully happen.
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -807,9 +850,9 @@ public Iterable<SSTableReader>
filterSSTables(LifecycleTransaction transaction)
SSTableReader sstable = sstableIter.next();
boolean needsCleanupFull = needsCleanup(sstable,
fullRanges);
boolean needsCleanupTransient = !transientRanges.isEmpty()
&& sstable.isRepaired() && needsCleanup(sstable, transientRanges);
+ totalSSTables++;
Review Comment:
Why move this?
##########
src/java/org/apache/cassandra/db/compaction/CompactionManager.java:
##########
@@ -834,7 +878,13 @@ public Iterable<SSTableReader>
filterSSTables(LifecycleTransaction transaction)
public void execute(LifecycleTransaction txn) throws IOException
{
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore,
allRanges, transientRanges, txn.onlyOne().isRepaired(),
FBUtilities.nowInSeconds());
- doCleanupOne(cfStore, txn, cleanupStrategy, allRanges,
hasIndexes);
+ this.incompleteOperation = doCleanupOne(cfStore, txn,
cleanupStrategy, allRanges, hasIndexes, rangesInUseByAccord);
Review Comment:
This overwrites whether it was complete for each sstable so the final value
is just what happened to the last sstable. You also don't need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]