This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new bd1622d5ec Various fixes and improvements Improve: - InMemoryJournal compaction should simulate files to ensure we compact the same cohorts of records (so shadowing applied correctly) - Don't update CFK deps if already known - avoid unnecessary heapification of LogGroupTimers - begin removal of Guava (mostly unused) - consider medium path on fast path delayed - add Seekables.indexOf to support faster serialization - unboxed Invariant.requires variant - AsyncChains.addCall [...] bd1622d5ec is described below commit bd1622d5ec80074471d768900cc4d43344d573bb Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Apr 9 11:16:23 2025 +0100 Various fixes and improvements Improve: - InMemoryJournal compaction should simulate files to ensure we compact the same cohorts of records (so shadowing applied correctly) - Don't update CFK deps if already known - avoid unnecessary heapification of LogGroupTimers - begin removal of Guava (mostly unused) - consider medium path on fast path delayed - add Seekables.indexOf to support faster serialization - unboxed Invariant.requires variant - AsyncChains.addCallback -> invoke Fix: - Recovery must wait for earlier transactions on shards that have not yet been accepted, even if we recover a fast Stable record on another shard - only skip Dep calculation on PreAccept if vote is required by coordinator - ReadTracker must slice Minimal the first unavailable collection - If PreLoadContext cannot acquire shared caches, still consider existing contents - CFK: make sure to insert UNSTABLE into missing array - Fix failed task stops DelayedCommandStore task queue processing further tasks - short circuit AbstractRanges.equals() - Handle edge case where we can take fast/medium path but one of the Deps replies contains a future TxnId - update executeAtEpoch when retryInFutureEpoch - Fix incorrect validation in validateMissing (should only validate newInfo is not in missing when in witnessedBy; all other additions should be included) patch by Benedict; reviewed by David Capwell for CASSANDRA-20522 --- modules/accord | 2 +- .../service/accord/AccordConfigurationService.java | 6 +++--- .../org/apache/cassandra/service/accord/AccordJournal.java | 3 +-- .../org/apache/cassandra/service/accord/AccordService.java | 4 ++-- .../service/accord/interop/AccordInteropApply.java | 8 ++++---- .../service/accord/serializers/ApplySerializers.java | 14 +++++++++----- .../org/apache/cassandra/service/accord/txn/TxnWrite.java | 2 +- .../distributed/test/accord/AccordBootstrapTest.java | 2 +- .../cassandra/service/accord/AccordJournalBurnTest.java | 3 +-- .../org/apache/cassandra/service/accord/EpochSyncTest.java | 2 +- 10 files changed, 24 insertions(+), 22 deletions(-) diff --git a/modules/accord b/modules/accord index 1192d253f3..fc14a154fd 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 1192d253f36d072b056f1d16e292bdf202018758 +Subproject commit fc14a154fd514d4ab40b37508fb9497f786835e0 diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 7b1e36f8f1..e0ca26ee5d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -246,7 +246,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc .map(e -> tcmIdToAccord(e.getKey())) .collect(Collectors.toSet()); if (epochs.lastAcknowledged() >= topology.epoch()) checkIfNodesRemoved(topology, stillLiveNodes); - else epochs.acknowledgeFuture(topology.epoch()).addCallback(() -> checkIfNodesRemoved(topology, stillLiveNodes)); + else epochs.acknowledgeFuture(topology.epoch()).invokeIfSuccess(() -> checkIfNodesRemoved(topology, stillLiveNodes)); } private void checkIfNodesRemoved(Topology topology, Set<Node.Id> stillLiveNodes) @@ -336,7 +336,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc } } - getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> reportMetadata(metadata)); + getOrCreateEpochState(epoch - 1).acknowledged().invokeIfSuccess(() -> reportMetadata(metadata)); } private final Map<Long, Future<Void>> pendingTopologies = new ConcurrentHashMap<>(); @@ -635,7 +635,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc public Future<Void> unsafeLocalSyncNotified(long epoch) { AsyncPromise<Void> promise = new AsyncPromise<>(); - getOrCreateEpochState(epoch).localSyncNotified().addCallback((result, failure) -> { + getOrCreateEpochState(epoch).localSyncNotified().invoke((result, failure) -> { if (failure != null) promise.tryFailure(failure); else promise.trySuccess(result); }); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index f4159f5731..7a29afb061 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -166,13 +166,12 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier return journal.currentActiveSegment().index().size(); } - public AccordJournal start(Node node) + public void start(Node node) { Invariants.require(status == Status.INITIALIZED); this.node = node; status = Status.STARTING; journal.start(); - return this; } public boolean started() diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 75f5031ea4..d23e0c5598 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -744,7 +744,7 @@ public class AccordService implements IAccordService, Shutdownable List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new ArrayList<>(ids.length); for (int id : ids) chains.add(loadDebug(original, commandStores.forId(id))); - return AsyncChains.all(chains); + return AsyncChains.allOf(chains); } private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, CommandStore store) @@ -816,7 +816,7 @@ public class AccordService implements IAccordService, Shutdownable } if (chains.isEmpty()) return null; - return AsyncChains.all(chains).map(ignore -> null); + return AsyncChains.allOf(chains).map(ignore -> null); } private static AsyncChain<Void> populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore safeStore, TokenKey pk, TxnId txnId, Timestamp executeAt) diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java index 7dd0a4f03b..b9af95db34 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java @@ -79,9 +79,9 @@ public class AccordInteropApply extends Apply implements LocalListeners.ComplexL public static final IVersionedSerializer<AccordInteropApply> serializer = new ApplySerializer<AccordInteropApply>() { @Override - protected AccordInteropApply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + protected AccordInteropApply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { - return new AccordInteropApply(kind, txnId, scope, minEpoch, waitForEpoch, executeAt, deps, txn, fullRoute, writes, result); + return new AccordInteropApply(kind, txnId, scope, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); } }; @@ -89,9 +89,9 @@ public class AccordInteropApply extends Apply implements LocalListeners.ComplexL transient Int2ObjectHashMap<LocalListeners.Registered> listeners; boolean failed; - private AccordInteropApply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, long waitForEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + private AccordInteropApply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { - super(kind, txnId, route, minEpoch, waitForEpoch, executeAt, deps, txn, fullRoute, writes, result); + super(kind, txnId, route, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); } private AccordInteropApply(Kind kind, Id to, Topologies participates, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java index d4fd7f3ce5..cf3f449354 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java @@ -66,6 +66,7 @@ public class ApplySerializers public void serializeBody(A apply, DataOutputPlus out, Version version) throws IOException { out.writeVInt(apply.minEpoch - apply.waitForEpoch); + out.writeUnsignedVInt(apply.maxEpoch - apply.minEpoch); kind.serialize(apply.kind, out); ExecuteAtSerializer.serialize(apply.txnId, apply.executeAt, out); DepsSerializers.partialDeps.serialize(apply.deps, out); @@ -75,13 +76,15 @@ public class ApplySerializers CommandSerializers.writes.serialize(apply.writes, out, version); } - protected abstract A deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, Apply.Kind kind, + protected abstract A deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result); @Override public A deserializeBody(DataInputPlus in, Version version, TxnId txnId, Route<?> scope, long waitForEpoch) throws IOException { - return deserializeApply(txnId, scope, waitForEpoch + in.readVInt(), waitForEpoch, + long minEpoch = waitForEpoch + in.readVInt(); + long maxEpoch = minEpoch + in.readUnsignedVInt(); + return deserializeApply(txnId, scope, minEpoch, waitForEpoch, maxEpoch, kind.deserialize(in), ExecuteAtSerializer.deserialize(txnId, in), DepsSerializers.partialDeps.deserialize(in), @@ -95,6 +98,7 @@ public class ApplySerializers public long serializedBodySize(A apply, Version version) { return TypeSizes.sizeofVInt(apply.minEpoch - apply.waitForEpoch) + + TypeSizes.sizeofUnsignedVInt(apply.maxEpoch - apply.minEpoch) + kind.serializedSize(apply.kind) + ExecuteAtSerializer.serializedSize(apply.txnId, apply.executeAt) + DepsSerializers.partialDeps.serializedSize(apply.deps) @@ -107,10 +111,10 @@ public class ApplySerializers public static final IVersionedSerializer<Apply> request = new ApplySerializer<>() { @Override - protected Apply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, Apply.Kind kind, - Timestamp executeAt, PartialDeps deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result) + protected Apply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, + Timestamp executeAt, PartialDeps deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result) { - return Apply.SerializationSupport.create(txnId, scope, minEpoch, waitForEpoch, kind, executeAt, deps, txn, fullRoute, writes, result); + return Apply.SerializationSupport.create(txnId, scope, minEpoch, waitForEpoch, maxEpoch, kind, executeAt, deps, txn, fullRoute, writes, result); } }; diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java index d8e7da61e7..071dc67bee 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java @@ -411,7 +411,7 @@ public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements Writ if (results.size() == 1) return results.get(0).flatMap(o -> Writes.SUCCESS); - return AsyncChains.all(results).flatMap(objects -> Writes.SUCCESS); + return AsyncChains.allOf(results).flatMap(objects -> Writes.SUCCESS); } public long estimatedSizeOnHeap() diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index ea7dcd8814..30f4346cf8 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -296,7 +296,7 @@ public class AccordBootstrapTest extends TestBaseImpl for (long epoch = topologyManager.minEpoch() ; epoch <= topologyManager.epoch() ; ++epoch) { CountDownLatch latch = new CountDownLatch(1); - topologyManager.epochReady(epoch).data.addCallback(latch::countDown); + topologyManager.epochReady(epoch).data.invokeIfSuccess(latch::countDown); while (true) { try diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index fbf2a9c874..8586a0ba08 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -182,11 +182,10 @@ public class AccordJournalBurnTest extends BurnTestBase }, directory, cfs) { @Override - public AccordJournal start(Node node) + public void start(Node node) { super.start(node); unsafeSetStarted(); - return this; } @Override diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java index 68ec5058de..cc05e86db4 100644 --- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java +++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java @@ -695,7 +695,7 @@ public class EpochSyncTest EpochReady ready = new EpochReady(topology.epoch(), metadata, coordination, data, reads); topology().onTopologyUpdate(topology, () -> ready, e -> {}); - ready.coordinate.addCallback(() -> topology().onEpochSyncComplete(id, topology.epoch())); + ready.coordinate.invokeIfSuccess(() -> topology().onEpochSyncComplete(id, topology.epoch())); if (topology().minEpoch() == topology.epoch() && topology().epoch() != topology.epoch()) return ready.coordinate; config.acknowledgeEpoch(ready, startSync); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org