This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new bd1622d5ec Various fixes and improvements Improve:  - InMemoryJournal 
compaction should simulate files to ensure we compact the same cohorts of 
records (so shadowing applied correctly)  - Don't update CFK deps if already 
known  - avoid unnecessary heapification of LogGroupTimers  - begin removal of 
Guava (mostly unused)  - consider medium path on fast path delayed  - add 
Seekables.indexOf to support faster serialization  - unboxed Invariant.requires 
variant  - AsyncChains.addCall [...]
bd1622d5ec is described below

commit bd1622d5ec80074471d768900cc4d43344d573bb
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Apr 9 11:16:23 2025 +0100

    Various fixes and improvements
    Improve:
     - InMemoryJournal compaction should simulate files to ensure we compact 
the same cohorts of records (so shadowing applied correctly)
     - Don't update CFK deps if already known
     - avoid unnecessary heapification of LogGroupTimers
     - begin removal of Guava (mostly unused)
     - consider medium path on fast path delayed
     - add Seekables.indexOf to support faster serialization
     - unboxed Invariant.requires variant
     - AsyncChains.addCallback -> invoke
    Fix:
     - Recovery must wait for earlier transactions on shards that have not yet 
been accepted, even if we recover a fast Stable record on another shard
     - only skip Dep calculation on PreAccept if vote is required by coordinator
     - ReadTracker must slice Minimal the first unavailable collection
     - If PreLoadContext cannot acquire shared caches, still consider existing 
contents
     - CFK: make sure to insert UNSTABLE into missing array
     - Fix failed task stops DelayedCommandStore task queue processing further 
tasks
     - short circuit AbstractRanges.equals()
     - Handle edge case where we can take fast/medium path but one of the Deps 
replies contains a future TxnId
     - update executeAtEpoch when retryInFutureEpoch
     - Fix incorrect validation in validateMissing (should only validate 
newInfo is not in missing when in witnessedBy; all other additions should be 
included)
    
    patch by Benedict; reviewed by David Capwell for CASSANDRA-20522
---
 modules/accord                                             |  2 +-
 .../service/accord/AccordConfigurationService.java         |  6 +++---
 .../org/apache/cassandra/service/accord/AccordJournal.java |  3 +--
 .../org/apache/cassandra/service/accord/AccordService.java |  4 ++--
 .../service/accord/interop/AccordInteropApply.java         |  8 ++++----
 .../service/accord/serializers/ApplySerializers.java       | 14 +++++++++-----
 .../org/apache/cassandra/service/accord/txn/TxnWrite.java  |  2 +-
 .../distributed/test/accord/AccordBootstrapTest.java       |  2 +-
 .../cassandra/service/accord/AccordJournalBurnTest.java    |  3 +--
 .../org/apache/cassandra/service/accord/EpochSyncTest.java |  2 +-
 10 files changed, 24 insertions(+), 22 deletions(-)

diff --git a/modules/accord b/modules/accord
index 1192d253f3..fc14a154fd 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 1192d253f36d072b056f1d16e292bdf202018758
+Subproject commit fc14a154fd514d4ab40b37508fb9497f786835e0
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 7b1e36f8f1..e0ca26ee5d 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -246,7 +246,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
                                                                .map(e -> 
tcmIdToAccord(e.getKey()))
                                                                
.collect(Collectors.toSet());
         if (epochs.lastAcknowledged() >= topology.epoch()) 
checkIfNodesRemoved(topology, stillLiveNodes);
-        else epochs.acknowledgeFuture(topology.epoch()).addCallback(() -> 
checkIfNodesRemoved(topology, stillLiveNodes));
+        else epochs.acknowledgeFuture(topology.epoch()).invokeIfSuccess(() -> 
checkIfNodesRemoved(topology, stillLiveNodes));
     }
 
     private void checkIfNodesRemoved(Topology topology, Set<Node.Id> 
stillLiveNodes)
@@ -336,7 +336,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
             }
         }
 
-        getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> 
reportMetadata(metadata));
+        getOrCreateEpochState(epoch - 1).acknowledged().invokeIfSuccess(() -> 
reportMetadata(metadata));
     }
 
     private final Map<Long, Future<Void>> pendingTopologies = new 
ConcurrentHashMap<>();
@@ -635,7 +635,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     public Future<Void> unsafeLocalSyncNotified(long epoch)
     {
         AsyncPromise<Void> promise = new AsyncPromise<>();
-        getOrCreateEpochState(epoch).localSyncNotified().addCallback((result, 
failure) -> {
+        getOrCreateEpochState(epoch).localSyncNotified().invoke((result, 
failure) -> {
             if (failure != null) promise.tryFailure(failure);
             else promise.trySuccess(result);
         });
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index f4159f5731..7a29afb061 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -166,13 +166,12 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         return journal.currentActiveSegment().index().size();
     }
 
-    public AccordJournal start(Node node)
+    public void start(Node node)
     {
         Invariants.require(status == Status.INITIALIZED);
         this.node = node;
         status = Status.STARTING;
         journal.start();
-        return this;
     }
 
     public boolean started()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 75f5031ea4..d23e0c5598 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -744,7 +744,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new 
ArrayList<>(ids.length);
         for (int id : ids)
             chains.add(loadDebug(original, commandStores.forId(id)));
-        return AsyncChains.all(chains);
+        return AsyncChains.allOf(chains);
     }
 
     private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, 
CommandStore store)
@@ -816,7 +816,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
         if (chains.isEmpty())
             return null;
-        return AsyncChains.all(chains).map(ignore -> null);
+        return AsyncChains.allOf(chains).map(ignore -> null);
     }
 
     private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore 
safeStore, TokenKey pk, TxnId txnId, Timestamp executeAt)
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 7dd0a4f03b..b9af95db34 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -79,9 +79,9 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
     public static final IVersionedSerializer<AccordInteropApply> serializer = 
new ApplySerializer<AccordInteropApply>()
     {
         @Override
-        protected AccordInteropApply deserializeApply(TxnId txnId, Route<?> 
scope, long minEpoch, long waitForEpoch, Apply.Kind kind, Timestamp executeAt, 
PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes 
writes, Result result)
+        protected AccordInteropApply deserializeApply(TxnId txnId, Route<?> 
scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, 
Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> 
fullRoute, Writes writes, Result result)
         {
-            return new AccordInteropApply(kind, txnId, scope, minEpoch, 
waitForEpoch, executeAt, deps, txn, fullRoute, writes, result);
+            return new AccordInteropApply(kind, txnId, scope, minEpoch, 
waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result);
         }
     };
 
@@ -89,9 +89,9 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
     transient Int2ObjectHashMap<LocalListeners.Registered> listeners;
     boolean failed;
 
-    private AccordInteropApply(Kind kind, TxnId txnId, Route<?> route, long 
minEpoch, long waitForEpoch, Timestamp executeAt, PartialDeps deps, @Nullable 
PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
+    private AccordInteropApply(Kind kind, TxnId txnId, Route<?> route, long 
minEpoch, long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps 
deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes 
writes, Result result)
     {
-        super(kind, txnId, route, minEpoch, waitForEpoch, executeAt, deps, 
txn, fullRoute, writes, result);
+        super(kind, txnId, route, minEpoch, waitForEpoch, maxEpoch, executeAt, 
deps, txn, fullRoute, writes, result);
     }
 
     private AccordInteropApply(Kind kind, Id to, Topologies participates, 
TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java
index d4fd7f3ce5..cf3f449354 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java
@@ -66,6 +66,7 @@ public class ApplySerializers
         public void serializeBody(A apply, DataOutputPlus out, Version 
version) throws IOException
         {
             out.writeVInt(apply.minEpoch - apply.waitForEpoch);
+            out.writeUnsignedVInt(apply.maxEpoch - apply.minEpoch);
             kind.serialize(apply.kind, out);
             ExecuteAtSerializer.serialize(apply.txnId, apply.executeAt, out);
             DepsSerializers.partialDeps.serialize(apply.deps, out);
@@ -75,13 +76,15 @@ public class ApplySerializers
                 CommandSerializers.writes.serialize(apply.writes, out, 
version);
         }
 
-        protected abstract A deserializeApply(TxnId txnId, Route<?> scope, 
long minEpoch, long waitForEpoch, Apply.Kind kind,
+        protected abstract A deserializeApply(TxnId txnId, Route<?> scope, 
long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind,
                                               Timestamp executeAt, PartialDeps 
deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result);
 
         @Override
         public A deserializeBody(DataInputPlus in, Version version, TxnId 
txnId, Route<?> scope, long waitForEpoch) throws IOException
         {
-            return deserializeApply(txnId, scope, waitForEpoch + 
in.readVInt(), waitForEpoch,
+            long minEpoch = waitForEpoch + in.readVInt();
+            long maxEpoch = minEpoch + in.readUnsignedVInt();
+            return deserializeApply(txnId, scope, minEpoch, waitForEpoch, 
maxEpoch,
                                     kind.deserialize(in),
                                     ExecuteAtSerializer.deserialize(txnId, in),
                                     
DepsSerializers.partialDeps.deserialize(in),
@@ -95,6 +98,7 @@ public class ApplySerializers
         public long serializedBodySize(A apply, Version version)
         {
             return   TypeSizes.sizeofVInt(apply.minEpoch - apply.waitForEpoch)
+                   + TypeSizes.sizeofUnsignedVInt(apply.maxEpoch - 
apply.minEpoch)
                    + kind.serializedSize(apply.kind)
                    + ExecuteAtSerializer.serializedSize(apply.txnId, 
apply.executeAt)
                    + DepsSerializers.partialDeps.serializedSize(apply.deps)
@@ -107,10 +111,10 @@ public class ApplySerializers
     public static final IVersionedSerializer<Apply> request = new 
ApplySerializer<>()
     {
         @Override
-        protected Apply deserializeApply(TxnId txnId, Route<?> scope, long 
minEpoch, long waitForEpoch, Apply.Kind kind,
-                               Timestamp executeAt, PartialDeps deps, 
PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result)
+        protected Apply deserializeApply(TxnId txnId, Route<?> scope, long 
minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind,
+                                         Timestamp executeAt, PartialDeps 
deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result)
         {
-            return Apply.SerializationSupport.create(txnId, scope, minEpoch, 
waitForEpoch, kind, executeAt, deps, txn, fullRoute, writes, result);
+            return Apply.SerializationSupport.create(txnId, scope, minEpoch, 
waitForEpoch, maxEpoch, kind, executeAt, deps, txn, fullRoute, writes, result);
         }
     };
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index d8e7da61e7..071dc67bee 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -411,7 +411,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         if (results.size() == 1)
             return results.get(0).flatMap(o -> Writes.SUCCESS);
 
-        return AsyncChains.all(results).flatMap(objects -> Writes.SUCCESS);
+        return AsyncChains.allOf(results).flatMap(objects -> Writes.SUCCESS);
     }
 
     public long estimatedSizeOnHeap()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index ea7dcd8814..30f4346cf8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -296,7 +296,7 @@ public class AccordBootstrapTest extends TestBaseImpl
                 for (long epoch = topologyManager.minEpoch() ; epoch <= 
topologyManager.epoch() ; ++epoch)
                 {
                     CountDownLatch latch = new CountDownLatch(1);
-                    
topologyManager.epochReady(epoch).data.addCallback(latch::countDown);
+                    
topologyManager.epochReady(epoch).data.invokeIfSuccess(latch::countDown);
                     while (true)
                     {
                         try
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index fbf2a9c874..8586a0ba08 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -182,11 +182,10 @@ public class AccordJournalBurnTest extends BurnTestBase
                          }, directory, cfs)
                          {
                              @Override
-                             public AccordJournal start(Node node)
+                             public void start(Node node)
                              {
                                  super.start(node);
                                  unsafeSetStarted();
-                                 return this;
                              }
 
                              @Override
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java 
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index 68ec5058de..cc05e86db4 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -695,7 +695,7 @@ public class EpochSyncTest
                         EpochReady ready = new EpochReady(topology.epoch(), 
metadata, coordination, data, reads);
 
                         topology().onTopologyUpdate(topology, () -> ready, e 
-> {});
-                        ready.coordinate.addCallback(() -> 
topology().onEpochSyncComplete(id, topology.epoch()));
+                        ready.coordinate.invokeIfSuccess(() -> 
topology().onEpochSyncComplete(id, topology.epoch()));
                         if (topology().minEpoch() == topology.epoch() && 
topology().epoch() != topology.epoch())
                             return ready.coordinate;
                         config.acknowledgeEpoch(ready, startSync);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to