This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08ee5ce1 support filtering by GC commands before loading
08ee5ce1 is described below
commit 08ee5ce1c6301201ccaf7d580a6af289ab4c5765
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Oct 4 13:04:57 2024 +0100
support filtering by GC commands before loading
special case recovery, to avoid loading range commands except where
necessary
---
.../src/main/java/accord/impl/InMemoryCommandStore.java | 1 +
.../java/accord/impl/progresslog/DefaultProgressLog.java | 5 ++++-
accord-core/src/main/java/accord/local/CommandStore.java | 3 ++-
accord-core/src/main/java/accord/local/KeyHistory.java | 1 +
.../src/main/java/accord/local/RedundantBefore.java | 16 ++++++++++++++++
.../src/main/java/accord/messages/BeginRecovery.java | 2 +-
.../src/main/java/accord/utils/PersistentField.java | 6 +-----
7 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 385fbd16..67bd325f 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -465,6 +465,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
case NONE:
continue;
case COMMANDS:
+ case RECOVERY:
commandsForKey.put(key,
commandsForKey((RoutingKey) key).createSafeReference());
break;
case TIMESTAMPS:
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 31ae8adc..9d1aea56 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -43,6 +43,7 @@ import accord.primitives.Participants;
import accord.primitives.ProgressToken;
import accord.primitives.Ranges;
import accord.primitives.Route;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.LogGroupTimers;
@@ -355,7 +356,9 @@ public class DefaultProgressLog implements ProgressLog,
Runnable
if (update != command)
command = blockedBy.updateAttributes(safeStore, update);
-
Invariants.checkState(safeStore.ranges().allSince(command.txnId().epoch()).intersects(command.participants().hasTouched()));
+ // TODO (required): tighten up ExclusiveSyncPoint range bounds
+ Invariants.checkState((command.txnId().is(Txn.Kind.ExclusiveSyncPoint)
? safeStore.ranges().all()
+
:
safeStore.ranges().allSince(command.txnId().epoch())).intersects(command.participants().hasTouched()));
// TODO (consider): consider triggering a preemption of existing
coordinator (if any) in some circumstances;
// today, an LWT can pre-empt more efficiently (i.e.
instantly) a failed operation whereas Accord will
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 726d13f3..afbafc71 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -69,6 +69,7 @@ import static accord.primitives.Routables.Slice.Minimal;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.utils.Invariants.checkState;
import static accord.utils.Invariants.illegalState;
+import static accord.utils.Invariants.nonNull;
/**
* Single threaded internal shard of accord transaction metadata
@@ -220,7 +221,7 @@ public abstract class CommandStore implements AgentExecutor
protected void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch)
{
- rangesForEpoch = newRangesForEpoch;
+ rangesForEpoch = nonNull(newRangesForEpoch);
}
public abstract boolean inStore();
diff --git a/accord-core/src/main/java/accord/local/KeyHistory.java
b/accord-core/src/main/java/accord/local/KeyHistory.java
index 7f061d6e..40dfe4ef 100644
--- a/accord-core/src/main/java/accord/local/KeyHistory.java
+++ b/accord-core/src/main/java/accord/local/KeyHistory.java
@@ -26,6 +26,7 @@ public enum KeyHistory
{
TIMESTAMPS,
COMMANDS,
+ RECOVERY,
NONE;
public boolean isNone()
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 41b84ae8..ed97fade 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -299,6 +299,17 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Entry>
return safeToRead;
}
+ static TxnId minGcBefore(Entry entry, @Nullable TxnId minGcBefore)
+ {
+ if (entry == null)
+ return minGcBefore;
+
+ if (minGcBefore == null)
+ return entry.gcBefore;
+
+ return TxnId.min(minGcBefore, entry.gcBefore);
+ }
+
static Ranges expectToExecute(Entry entry, @Nonnull Ranges
executeRanges, TxnId txnId, @Nullable Timestamp executeAt)
{
if (entry == null || (executeAt == null ? entry.outOfBounds(txnId)
: entry.outOfBounds(txnId, executeAt)))
@@ -574,6 +585,11 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Entry>
return foldl(ranges, Entry::validateSafeToRead, ranges,
forBootstrapAt, null, r -> false);
}
+ public TxnId minGcBefore(Routables<?> participants)
+ {
+ return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants,
Entry::minGcBefore, null, ignore -> false));
+ }
+
/**
* Subtract any ranges we consider stale or pre-bootstrap
*/
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index a2afd1b3..ad8bb5a0 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -215,7 +215,7 @@ public class BeginRecovery extends
TxnRequest.WithUnsynced<BeginRecovery.Recover
@Override
public KeyHistory keyHistory()
{
- return KeyHistory.COMMANDS;
+ return KeyHistory.RECOVERY;
}
@Override
diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java
b/accord-core/src/main/java/accord/utils/PersistentField.java
index 3d872185..da66ffdc 100644
--- a/accord-core/src/main/java/accord/utils/PersistentField.java
+++ b/accord-core/src/main/java/accord/utils/PersistentField.java
@@ -102,11 +102,7 @@ public class PersistentField<Input, Saved>
Saved newValue = merge.apply(inputValue, startingValue);
this.latestPending = newValue;
int id = ++nextId;
-
- synchronized (this)
- {
- pending.add(new Pending<>(id, newValue));
- }
+ pending.add(new Pending<>(id, newValue));
AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue);
pendingWrite.addCallback((success, fail) -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]