This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdd4529f Follow-up to CASSANDRA-21013: fix rf=1 catchup
cdd4529f is described below
commit cdd4529f67492becb10ebca94678b5d08d3d3d11
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Nov 14 14:25:34 2025 +0000
Follow-up to CASSANDRA-21013: fix rf=1 catchup
---
.../main/java/accord/coordinate/FetchDurableBefore.java | 4 +++-
accord-core/src/main/java/accord/local/Catchup.java | 9 +++++++--
accord-core/src/main/java/accord/primitives/Ranges.java | 2 ++
.../src/main/java/accord/utils/ReducingRangeMap.java | 14 ++++++++++++++
4 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/FetchDurableBefore.java
b/accord-core/src/main/java/accord/coordinate/FetchDurableBefore.java
index 328af527..7bb515f7 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchDurableBefore.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchDurableBefore.java
@@ -51,7 +51,9 @@ public class FetchDurableBefore extends
AbstractCoordination<Ranges, DurableBefo
void start()
{
super.start();
- contact(ignore -> new GetDurableBefore());
+ contact(ignore -> new GetDurableBefore(), id -> !node.id().equals(id));
+ markSelfContacted();
+ onSuccess(node.id(), new DurableBeforeReply(node.durableBefore()));
}
public static AsyncChain<DurableBefore> catchup(Node node)
diff --git a/accord-core/src/main/java/accord/local/Catchup.java
b/accord-core/src/main/java/accord/local/Catchup.java
index 15f2b1aa..edf1314f 100644
--- a/accord-core/src/main/java/accord/local/Catchup.java
+++ b/accord-core/src/main/java/accord/local/Catchup.java
@@ -20,6 +20,7 @@ package accord.local;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
@@ -56,7 +57,7 @@ public class Catchup
void register(SafeCommandStore safeStore)
{
- waitingOn = safeStore.ranges().all();
+ waitingOn =
safeStore.ranges().all().slice(durableBefore.ranges(Objects::nonNull), Minimal);
updateWaitingOn(safeStore);
if (waitingOn.isEmpty()) setSuccess(null);
@@ -68,7 +69,11 @@ public class Catchup
RedundantBefore redundantBefore = safeStore.redundantBefore();
Ranges newWaitingOn = redundantBefore.removeLostOrStale(waitingOn);
if (newWaitingOn != waitingOn)
- logger.info("{}: {} are retired (or stale)",
safeStore.commandStore(), waitingOn.without(newWaitingOn));
+ {
+ Ranges retiredOrStale = waitingOn.without(newWaitingOn);
+ if (!retiredOrStale.isEmpty())
+ logger.info("{}: {} are retired (or stale)",
safeStore.commandStore(), retiredOrStale);
+ }
newWaitingOn = durableBefore.foldlWithBounds(newWaitingOn,
(DurableBefore.Entry entry, Ranges ranges, RoutingKey entryStart, RoutingKey
entryEnd) -> {
Ranges entryRanges = Ranges.of(Range.create(entryStart,
entryEnd));
diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java
b/accord-core/src/main/java/accord/primitives/Ranges.java
index 978319c7..ae7dd126 100644
--- a/accord-core/src/main/java/accord/primitives/Ranges.java
+++ b/accord-core/src/main/java/accord/primitives/Ranges.java
@@ -185,6 +185,7 @@ public class Ranges extends AbstractRanges implements
Iterable<Range>, Seekables
return with(Ranges.of(withKey.asRange()));
}
+ // TODO (expected): should return self if no changes
public Ranges without(Unseekables<?> keysOrRanges)
{
if (keysOrRanges.domain() == Routable.Domain.Key)
@@ -192,6 +193,7 @@ public class Ranges extends AbstractRanges implements
Iterable<Range>, Seekables
return
ofSortedAndDeoverlappedUnchecked(withoutInternal((AbstractRanges)
keysOrRanges));
}
+ // TODO (expected): should return self if no changes
/**
* Subtracts the given set of ranges from this
*/
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 692c20e8..77f9c153 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -486,4 +486,18 @@ public class ReducingRangeMap<V> extends
ReducingIntervalMap<RoutingKey, V>
return new ReducingRangeMap<>(inclusiveEnds, starts.toArray(new
RoutingKey[0]), (V[])values.toArray(new Object[0]));
}
}
+
+ public Ranges ranges(Predicate<V> include)
+ {
+ Range[] ranges = new Range[values.length];
+ int count = 0;
+ for (int i = 0 ; i < values.length ; ++i)
+ {
+ if (include.test(values[i]))
+ ranges[count++] = Range.create(starts[i], starts[i+1]);
+ }
+ if (count < ranges.length)
+ ranges = Arrays.copyOf(ranges, count);
+ return Ranges.ofSortedAndDeoverlapped(ranges);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]