This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13a3ae9297 Fix: - DurabilityQueue/ShardScheduler deadlock -
MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null, which is
invalid if an exception has been thrown - AccordDurableOnFlush may be invoked
while Accord is starting up, so should use AccordService.unsafeInstance -
AccordCache shrink without lock regression - Cleanup system_accord compaction
leftovers before starting up - system_accord_debug.txn order -
system_accord_debug.txn_blocked_by order - sy [...]
13a3ae9297 is described below
commit 13a3ae92976ae0724b0d20cca12b80c0174fb4f7
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Nov 28 15:20:28 2025 +0000
Fix:
- DurabilityQueue/ShardScheduler deadlock
- MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null,
which is invalid if an exception has been thrown
- AccordDurableOnFlush may be invoked while Accord is starting up, so
should use AccordService.unsafeInstance
- AccordCache shrink without lock regression
- Cleanup system_accord compaction leftovers before starting up
- system_accord_debug.txn order
- system_accord_debug.txn_blocked_by order
- system_accord_debug.shard_epochs order
Improve:
- Set DefaultProgressLog.setMode(Catchup) during Catchup
- IdentityAccumulators only need to readLast, not readAll
- Limit number of static segments we compact at once to sstable
- If too many static segments on startup, wait for them to be compacted
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 7 ++
.../cassandra/db/virtual/AccordDebugKeyspace.java | 19 +++--
.../org/apache/cassandra/journal/Compactor.java | 17 ++++-
src/java/org/apache/cassandra/journal/Journal.java | 21 ++++++
src/java/org/apache/cassandra/journal/Params.java | 5 ++
.../org/apache/cassandra/journal/Segments.java | 11 +++
.../apache/cassandra/service/StartupChecks.java | 6 ++
.../cassandra/service/accord/AccordCache.java | 3 +-
.../cassandra/service/accord/AccordCacheEntry.java | 7 +-
.../service/accord/AccordDurableOnFlush.java | 2 +-
.../cassandra/service/accord/AccordJournal.java | 21 ++++--
.../service/accord/AccordJournalTable.java | 38 +++++++++-
.../cassandra/service/accord/AccordService.java | 86 ++++++++++++----------
.../cassandra/service/accord/DebugBlockedTxns.java | 6 ++
src/java/org/apache/cassandra/tcm/Startup.java | 2 +-
.../utils/memory/MemtableCleanerThread.java | 2 +-
.../db/virtual/AccordDebugKeyspaceTest.java | 51 +++++++++----
.../org/apache/cassandra/journal/TestParams.java | 6 ++
19 files changed, 236 insertions(+), 76 deletions(-)
diff --git a/modules/accord b/modules/accord
index 973335bfe7..8ccce74581 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 973335bfe7e930c6646016a4a48cff084f49b660
+Subproject commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 1e2379a564..74fa27580b 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -197,6 +197,7 @@ public class AccordSpec
public static class JournalSpec implements Params
{
public int segmentSize = 32 << 20;
+ public int compactMaxSegments = 32;
public FailurePolicy failurePolicy = FailurePolicy.STOP;
public ReplayMode replayMode = ReplayMode.ONLY_NON_DURABLE;
public FlushMode flushMode = FlushMode.PERIODIC;
@@ -225,6 +226,12 @@ public class AccordSpec
return segmentSize;
}
+ @Override
+ public int compactMaxSegments()
+ {
+ return compactMaxSegments;
+ }
+
@Override
public FailurePolicy failurePolicy()
{
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index c0c4c82f29..de989d7f6e 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -146,6 +146,7 @@ import
org.apache.cassandra.service.consensus.migration.TableMigrationState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.utils.LocalizeString;
+import org.apache.cassandra.utils.concurrent.Future;
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
import static accord.local.RedundantStatus.Property.GC_BEFORE;
@@ -169,6 +170,7 @@ import static
org.apache.cassandra.db.virtual.VirtualTable.Sorted.ASC;
import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.SORTED;
import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.UNSORTED;
import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.service.accord.AccordService.toFuture;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
// TODO (expected): split into separate classes in own package
@@ -713,7 +715,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
collector.partition(commandStore.id())
.collect(rows -> {
// TODO (desired): support maybe execute immediately
with safeStore
-
AccordService.getBlocking(commandStore.chain((PreLoadContext.Empty)
metadata::toString, safeStore -> { addRows(safeStore, rows); }));
+ Future<?> future =
toFuture(commandStore.chain((PreLoadContext.Empty) metadata::toString,
safeStore -> { addRows(safeStore, rows); }));
+ if
(!future.awaitUntilThrowUncheckedOnInterrupt(collector.deadlineNanos()))
+ throw new InternalTimeoutException();
});
}
@@ -1601,7 +1605,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
"Accord per-CommandStore Transaction State",
"CREATE TABLE %s (\n" +
" command_store_id int,\n" +
- " txn_id text,\n" +
+ " txn_id 'TxnIdUtf8Type',\n" +
" save_status text,\n" +
" route text,\n" +
" durability text,\n" +
@@ -1792,7 +1796,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
case TRY_EXECUTE:
run(txnId, commandStoreId, safeStore -> {
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
- Commands.maybeExecute(safeStore, safeCommand,
safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(null, true,
true));
+ Commands.maybeExecute(safeStore, safeCommand,
safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(ignore -> {},
true, true));
return AsyncChains.success(null);
});
break;
@@ -2089,7 +2093,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" blocked_by_txn_id 'TxnIdUtf8Type',\n" +
" save_status text,\n" +
" execute_at text,\n" +
- " PRIMARY KEY (txn_id, command_store_id, depth,
blocked_by_key, blocked_by_txn_id)" +
+ " PRIMARY KEY (txn_id, depth, command_store_id,
blocked_by_txn_id, blocked_by_key)" +
')', TxnIdUtf8Type.instance), BEST_EFFORT, ASC);
}
@@ -2111,7 +2115,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
DebugBlockedTxns.visit(AccordService.unsafeInstance(),
txnId, maxDepth, collector.deadlineNanos(), txn -> {
String keyStr = txn.blockedViaKey == null ? "" :
txn.blockedViaKey.toString();
String txnIdStr = txn.txnId == null ||
txn.txnId.equals(txnId) ? "" : txn.txnId.toString();
- rows.add(txn.commandStoreId, txn.depth, keyStr,
txnIdStr)
+ rows.add(txn.depth, txn.commandStoreId, txnIdStr,
keyStr)
.eagerCollect(columns -> {
columns.add("save_status", txn.saveStatus,
TO_STRING)
.add("execute_at", txn.executeAt,
TO_STRING);
@@ -2160,8 +2164,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" quorum_fast_privileged_deps int,\n" +
" quorum_fast_privileged_nodeps int,\n" +
" token_end 'TokenUtf8Type',\n" +
- " PRIMARY KEY (table_id, token_start, epoch_start)" +
- ')', UTF8Type.instance), FAIL, ASC);
+ " PRIMARY KEY (table_id, token_start, epoch_start))" +
+ " WITH CLUSTERING ORDER BY (token_start ASC,
epoch_start DESC);"
+ , UTF8Type.instance), FAIL, ASC);
}
@Override
diff --git a/src/java/org/apache/cassandra/journal/Compactor.java
b/src/java/org/apache/cassandra/journal/Compactor.java
index 7a1b19b088..b6749bc390 100644
--- a/src/java/org/apache/cassandra/journal/Compactor.java
+++ b/src/java/org/apache/cassandra/journal/Compactor.java
@@ -18,9 +18,9 @@
package org.apache.cassandra.journal;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -40,6 +41,7 @@ public final class Compactor<K, V> implements Runnable,
Shutdownable
private final SegmentCompactor<K, V> segmentCompactor;
private final ScheduledExecutorPlus executor;
private Future<?> scheduled;
+ public final WaitQueue compacted = WaitQueue.newWaitQueue();
Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor)
{
@@ -73,11 +75,18 @@ public final class Compactor<K, V> implements Runnable,
Shutdownable
@Override
public void run()
{
- Set<StaticSegment<K, V>> toCompact = new HashSet<>();
+ List<StaticSegment<K, V>> toCompact = new ArrayList<>();
journal.segments().selectStatic(toCompact);
if (toCompact.isEmpty())
return;
+ int limit = journal.params.compactMaxSegments();
+ if (toCompact.size() > limit)
+ {
+ toCompact.sort(StaticSegment::compareTo);
+ toCompact.subList(limit, toCompact.size()).clear();
+ }
+
try
{
Collection<StaticSegment<K, V>> newSegments =
segmentCompactor.compact(toCompact);
@@ -88,6 +97,8 @@ public final class Compactor<K, V> implements Runnable,
Shutdownable
journal.replaceCompactedSegments(toCompact, newSegments);
for (StaticSegment<K, V> segment : toCompact)
segment.discard(journal);
+
+ compacted.signalAll();
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index 3120eb0916..ecfca32a52 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -222,6 +222,27 @@ public class Journal<K, V> implements Shutdownable
"Unexpected journal state after initialization",
state);
flusher.start();
compactor.start();
+
+ final int maxSegments = 100;
+ if (segments.get().count(Segment::isStatic) > maxSegments)
+ {
+ while (true)
+ {
+ WaitQueue.Signal signal = compactor.compacted.register();
+ int count = segments.get().count(Segment::isStatic);
+ if (count <= maxSegments)
+ {
+ signal.cancel();
+ logger.info("Only {} static segments; continuing with
startup", count);
+ break;
+ }
+ else
+ {
+ logger.info("Too many ({}) static segments; waiting until
some compacted before starting up", count);
+ signal.awaitThrowUncheckedOnInterrupt();
+ }
+ }
+ }
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/journal/Params.java
b/src/java/org/apache/cassandra/journal/Params.java
index 1e898ebce4..161165177d 100644
--- a/src/java/org/apache/cassandra/journal/Params.java
+++ b/src/java/org/apache/cassandra/journal/Params.java
@@ -31,6 +31,11 @@ public interface Params
*/
int segmentSize();
+ /**
+ * @return maximum number of static segments to compact at once to sstable
+ */
+ int compactMaxSegments();
+
/**
* @return this journal's {@link FailurePolicy}
*/
diff --git a/src/java/org/apache/cassandra/journal/Segments.java
b/src/java/org/apache/cassandra/journal/Segments.java
index bdd447ec6b..7245029fea 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -102,6 +102,17 @@ class Segments<K, V>
return this.segments.values();
}
+ public int count(Predicate<? super Segment<K, V>> predicate)
+ {
+ int count = 0;
+ for (Segment<K, V> segment : segments.values())
+ {
+ if (predicate.test(segment))
+ ++count;
+ }
+ return count;
+ }
+
/**
* Returns segments in timestamp order. Will allocate and sort the segment
collection.
*/
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 4ec3019465..11a27d7fe8 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -725,6 +725,12 @@ public class StartupChecks
for (TableMetadata cfm :
Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
ColumnFamilyStore.scrubDataDirectories(cfm);
+ if (DatabaseDescriptor.getAccordTransactionsEnabled())
+ {
+ for (TableMetadata cfm :
Schema.instance.getTablesAndViews(SchemaConstants.ACCORD_KEYSPACE_NAME))
+ ColumnFamilyStore.scrubDataDirectories(cfm);
+ }
+
try
{
SystemKeyspace.checkHealth();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index fc794f6dec..6c6eb167b8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -259,8 +259,7 @@ public class AccordCache implements CacheSize
{
//noinspection LockAcquiredButNotSafelyReleased
lock.lock();
- node.tryApplyShrink(cur, upd);
- queue.addLast(node);
+ node.tryApplyShrink(cur, upd, queue);
}
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
index 02ee11c3db..26f46334d6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import accord.utils.ArrayBuffers.BufferList;
+import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.utils.Invariants;
import accord.utils.async.Cancellable;
@@ -595,10 +596,14 @@ public class AccordCacheEntry<K, V> extends
IntrusiveLinkedListNode
return ((FailedToSave)state).cause;
}
- void tryApplyShrink(Object cur, Object upd)
+ void tryApplyShrink(Object cur, Object upd,
IntrusiveLinkedList<AccordCacheEntry<?,?>> queue)
{
+ if (references() > 0 || !isUnqueued())
+ return;
+
if (isLoaded() && unwrap() == cur && upd != cur && upd != null)
applyShrink(owner.parent(), cur, upd);
+ queue.addLast(this);
}
private void applyShrink(AccordCache.Type<K, V, ?> parent, Object cur,
Object upd)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
index 96cf75d0cd..7f1bb70a04 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
@@ -58,7 +58,7 @@ class AccordDurableOnFlush implements Consumer<TableMetadata>
notify = commandStores;
commandStores = null;
}
- CommandStores commandStores =
AccordService.instance().node().commandStores();
+ CommandStores commandStores =
AccordService.unsafeInstance().node().commandStores();
for (Map.Entry<Integer, RedundantBefore> e : notify.entrySet())
{
RedundantBefore durable = e.getValue();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 92c3629abe..885bbfa961 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -189,8 +189,9 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
Invariants.require(status == Status.INITIALIZED);
this.node = node;
status = Status.STARTING;
- journal.start();
+ // start table first to scrub directories before compactor starts
journalTable.start();
+ journal.start();
}
public boolean started()
@@ -320,28 +321,28 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
@Override
public RedundantBefore loadRedundantBefore(int commandStoreId)
{
- IdentityAccumulator<RedundantBefore> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
+ IdentityAccumulator<RedundantBefore> accumulator = readLast(new
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
return accumulator.get();
}
@Override
public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
{
- IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT,
commandStoreId));
+ IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator =
readLast(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT,
commandStoreId));
return accumulator.get();
}
@Override
public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
{
- IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ,
commandStoreId));
+ IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator =
readLast(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ,
commandStoreId));
return accumulator.get();
}
@Override
public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
{
- IdentityAccumulator<RangesForEpoch> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
+ IdentityAccumulator<RangesForEpoch> accumulator = readLast(new
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
return accumulator.get();
}
@@ -520,6 +521,16 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
return builder;
}
+ public <BUILDER extends FlyweightImage> BUILDER readLast(JournalKey key)
+ {
+ BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
+ builder.reset(key);
+ // TODO (expected): this can be further improved to avoid allocating
lambdas
+ AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>)
key.type.serializer;
+ journalTable.readLast(key, (in, userVersion) ->
serializer.deserialize(key, builder, in, userVersion));
+ return builder;
+ }
+
public void forEachEntry(JournalKey key, AccordJournalTable.Reader reader)
{
journalTable.readAll(key, reader);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 17aff49f86..40649cc432 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -365,7 +365,43 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
}
}
}
-
+
+ public void readLast(K key, Reader reader)
+ {
+ readLast(key, new RecordConsumerAdapter<>(reader));
+ }
+
+ public void readLast(K key, RecordConsumer<K> reader)
+ {
+ try (TableKeyIterator table = readAllFromTable(key))
+ {
+ boolean hasTableData = table.advance();
+ long minSegment = hasTableData ? table.segment : Long.MIN_VALUE;
+
+ class JournalReader implements RecordConsumer<K>
+ {
+ boolean read;
+ @Override
+ public void accept(long segment, int position, K key,
ByteBuffer buffer, int userVersion)
+ {
+ if (segment > minSegment)
+ {
+ reader.accept(segment, position, key, buffer,
userVersion);
+ read = true;
+ }
+ }
+ }
+
+ // First, read all journal entries newer than anything flushed
into sstables
+ JournalReader journalReader = new JournalReader();
+ journal.readLast(key, journalReader);
+
+ // Then, read SSTables, if we haven't found a record already
+ if (hasTableData && !journalReader.read)
+ reader.accept(table.segment, table.offset, key, table.value,
table.userVersion);
+ }
+ }
+
// TODO (expected): why are recordColumn and versionColumn instance
fields, so that this cannot be a static class?
class TableKeyIterator implements Closeable, RecordConsumer<K>
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index cff5f26589..e7e5762fb1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -38,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
+import accord.impl.progresslog.DefaultProgressLog;
import accord.local.Catchup;
import accord.topology.ActiveEpochs;
import accord.topology.EpochReady;
@@ -131,6 +132,7 @@ import
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static accord.api.Journal.TopologyUpdate;
import static
accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE;
+import static accord.impl.progresslog.DefaultProgressLog.ModeFlag.CATCH_UP;
import static accord.local.durability.DurabilityService.SyncLocal.Self;
import static accord.local.durability.DurabilityService.SyncRemote.All;
import static accord.messages.SimpleReply.Ok;
@@ -475,67 +477,75 @@ public class AccordService implements IAccordService,
Shutdownable
AccordSpec spec = DatabaseDescriptor.getAccord();
if (!spec.catchup_on_start)
{
- logger.info("Not catching up with peers");
+ logger.info("Catchup disabled; continuing to startup");
return;
}
BootstrapState bootstrapState = SystemKeyspace.getBootstrapState();
if (bootstrapState == COMPLETED)
{
- long maxLatencyNanos =
spec.catchup_on_start_fail_latency.toNanoseconds();
- int attempts = 1;
- while (true)
+ node.commandStores().forAllUnsafe(commandStore ->
((DefaultProgressLog)commandStore.unsafeProgressLog()).setMode(CATCH_UP));
+ try
{
- logger.info("Catching up with quorum...");
- long start = nanoTime();
- long failAt = start + maxLatencyNanos;
- Future<Void> f = toFuture(Catchup.catchup(node));
- if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
+ long maxLatencyNanos =
spec.catchup_on_start_fail_latency.toNanoseconds();
+ int attempts = 1;
+ while (true)
{
- if (spec.catchup_on_start_exit_on_failure)
+ logger.info("Catchup with quorum...");
+ long start = nanoTime();
+ long failAt = start + maxLatencyNanos;
+ Future<Void> f = toFuture(Catchup.catchup(node));
+ if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
{
- logger.error("Catch up exceeded maximum latency of
{}ns; shutting down", maxLatencyNanos);
- throw new RuntimeException("Could not catch up with
peers");
+ if (spec.catchup_on_start_exit_on_failure)
+ {
+ logger.error("Catchup exceeded maximum latency of
{}ns; shutting down", maxLatencyNanos);
+ throw new RuntimeException("Could not catchup with
peers");
+ }
+ logger.error("Catchup exceeded maximum latency of
{}ns; continuing to startup", maxLatencyNanos);
+ break;
}
- logger.error("Catch up exceeded maximum latency of {}ns;
starting up", maxLatencyNanos);
- break;
- }
- Throwable failed = f.cause();
- if (failed != null)
- {
- if (spec.catchup_on_start_exit_on_failure)
- throw new RuntimeException("Could not catch up with
peers", failed);
+ Throwable failed = f.cause();
+ if (failed != null)
+ {
+ if (spec.catchup_on_start_exit_on_failure)
+ throw new RuntimeException("Could not catchup with
peers", failed);
- logger.error("Could not catch up with peers; continuing to
startup");
- break;
- }
+ logger.error("Could not catchup with peers; continuing
to startup");
+ break;
+ }
- long end = nanoTime();
- double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
- logger.info("Finished catching up with all quorums. {}s
elapsed.", String.format("%.2f", seconds));
+ long end = nanoTime();
+ double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
+ logger.info("Finished catchup with all quorums. {}s
elapsed.", String.format("%.2f", seconds));
- if (seconds <=
spec.catchup_on_start_success_latency.toSeconds())
- break;
+ if (seconds <=
spec.catchup_on_start_success_latency.toSeconds())
+ break;
- if (++attempts > spec.catchup_on_start_max_attempts)
- {
- if (spec.catchup_on_start_exit_on_failure)
+ if (++attempts > spec.catchup_on_start_max_attempts)
{
- logger.error("Catch up was slow, aborting after {}
attempts and shutting down", attempts);
- throw new RuntimeException("Could not catch up with
peers");
+ if (spec.catchup_on_start_exit_on_failure)
+ {
+ logger.error("Catchup was slow, aborting after {}
attempts and shutting down", attempts);
+ throw new RuntimeException("Could not catchup with
peers");
+ }
+
+ logger.info("Catchup was slow; continuing to startup
after {} attempts.", attempts - 1);
+ break;
}
- logger.info("Catch up was slow; continuing to startup
after {} attempts.", attempts - 1);
- break;
+ logger.info("Catchup was slow, so we may behind again;
retrying");
}
-
- logger.info("Catch up was slow, so we may behind again;
retrying");
+ }
+ finally
+ {
+ node.commandStores().forAllUnsafe(commandStore ->
((DefaultProgressLog)commandStore.unsafeProgressLog()).unsetMode(CATCH_UP));
}
}
else
{
- logger.info("Not catching up with quorum, as bootstrap state is
{}", bootstrapState);
+ logger.info("No catchup, as bootstrap state is {}",
bootstrapState);
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
index a4ca0af5e1..7756c712b4 100644
--- a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
+++ b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
@@ -96,8 +96,14 @@ public class DebugBlockedTxns
int c = Integer.compare(this.commandStoreId, that.commandStoreId);
if (c == 0) c = Integer.compare(this.depth, that.depth);
if (c == 0) c = this.txnId.compareTo(that.txnId);
+ if (c == 0) c =
this.blockedViaKeyString().compareTo(that.blockedViaKeyString());
return c;
}
+
+ private String blockedViaKeyString()
+ {
+ return blockedViaKey == null ? "" : blockedViaKey.toString();
+ }
}
final IAccordService service;
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java
b/src/java/org/apache/cassandra/tcm/Startup.java
index 53a0f6095a..1589e8db17 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -189,7 +189,7 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
{
// Skip system as we've already cleaned it
- if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
+ if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) ||
keyspace.name.equals(SchemaConstants.ACCORD_KEYSPACE_NAME))
continue;
for (TableMetadata cfm : keyspace.tables)
diff --git
a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index 99929546f5..20b320dc05 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -92,7 +92,7 @@ public class MemtableCleanerThread<P extends MemtablePool>
implements Interrupti
final int tasks = numPendingTasks.decrementAndGet();
// if the cleaning job was scheduled (res == true) or had an
error, trigger again after decrementing the tasks
- if ((res || err != null) && pool.needsCleaning())
+ if (((res != null && res) || err != null) && pool.needsCleaning())
wait.signal();
if (err != null)
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 260d468af3..ffa8cd62db 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
+import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -193,6 +194,12 @@ public class AccordDebugKeyspaceTest extends CQLTester
private static final String QUERY_PATTERN_TRACE =
String.format("SELECT * FROM %s.%s WHERE id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE);
+ private static final String QUERY_SHARD_EPOCHS =
+ String.format("SELECT * FROM %s.%s",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.SHARD_EPOCHS);
+
+ private static final String QUERY_LISTENERS_DEPS =
+ String.format("SELECT * FROM %s.%s",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.LISTENERS_DEPS);
+
@BeforeClass
public static void setUpClass()
{
@@ -215,6 +222,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
requireNetwork();
}
+ @After
+ public void afterTest() throws Throwable
+ {
+ }
+
@Test
public void unknownIsEmpty()
{
@@ -492,7 +504,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
getBlocking(accord.node().coordinate(id, txn));
filter.apply.awaitThrowUncheckedOnInterrupt();
spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY,
id.toString()),
- row(id.toString(), anyInt(), 0,
"", "", any(), "Applied")));
+ row(id.toString(), 0, anyInt(),
"", "", any(), "Applied")));
assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(),
"Applied"));
assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()),
row(id.toString(), "Applied"));
assertRows(execute(QUERY_JOURNAL, id.toString()),
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"),
row(id.toString(), "Applied"), row(id.toString(), null));
@@ -577,14 +589,14 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
- row(id.toString(), anyInt(), 0, "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(id.toString(), 0, anyInt(), "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
id.toString()),
- row(nodeId, id.toString(), anyInt(), 0, "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(nodeId, id.toString(), 0, anyInt(), "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
- row(id.toString(), anyInt(), 0, "", "", any(),
SaveStatus.ReadyToExecute.name()));
+ row(id.toString(), 0, anyInt(), "", "", any(),
SaveStatus.ReadyToExecute.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
id.toString()),
- row(nodeId, id.toString(), anyInt(), 0, "", "", any(),
SaveStatus.ReadyToExecute.name()));
+ row(nodeId, id.toString(), 0, anyInt(), "", "", any(),
SaveStatus.ReadyToExecute.name()));
}
finally
{
@@ -619,14 +631,14 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), anyInt(), 0, "", any(), any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(first.toString(), 0, anyInt(), "", any(), any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
first.toString()),
- row(nodeId, first.toString(), anyInt(), 0, "", any(),
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(nodeId, first.toString(), 0, anyInt(), "", any(),
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), anyInt(), 0, "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ row(first.toString(), 0, anyInt(), "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
first.toString()),
- row(nodeId, first.toString(), anyInt(), 0, "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ row(nodeId, first.toString(), 0, anyInt(), "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
filter.reset();
@@ -643,15 +655,15 @@ public class AccordDebugKeyspaceTest extends CQLTester
return rs.size() == 2;
});
assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
- row(second.toString(), anyInt(), 0, "", "",
anyNonNull(), SaveStatus.Stable.name()),
- row(second.toString(), anyInt(), 1, any(),
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ row(second.toString(), 0, anyInt(), "", "",
anyNonNull(), SaveStatus.Stable.name()),
+ row(second.toString(), 1, anyInt(), first.toString(),
any(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1",
second.toString()),
- row(second.toString(), anyInt(), 0, any(), "",
anyNonNull(), SaveStatus.Stable.name()));
+ row(second.toString(), 0, anyInt(), "", any(),
anyNonNull(), SaveStatus.Stable.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
second.toString()),
- row(nodeId, second.toString(), anyInt(), 0, "", "",
anyNonNull(), SaveStatus.Stable.name()),
- row(nodeId, second.toString(), anyInt(), 1, any(),
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ row(nodeId, second.toString(), 0, anyInt(), "", "",
anyNonNull(), SaveStatus.Stable.name()),
+ row(nodeId, second.toString(), 1, anyInt(),
first.toString(), any(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE + " AND depth < 1",
nodeId, second.toString()),
- row(nodeId, second.toString(), anyInt(), 0, any(), "",
anyNonNull(), SaveStatus.Stable.name()));
+ row(nodeId, second.toString(), 0, anyInt(), "", any(),
anyNonNull(), SaveStatus.Stable.name()));
}
finally
{
@@ -727,6 +739,15 @@ public class AccordDebugKeyspaceTest extends CQLTester
}
}
+ @Test
+ public void testShardEpochsTable()
+ {
+ String table1 = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ String table2 = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ UntypedResultSet rs = execute(QUERY_SHARD_EPOCHS);
+ Assert.assertTrue(rs.size() > 1);
+ }
+
private static AccordService accord()
{
return (AccordService) AccordService.instance();
diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java
b/test/unit/org/apache/cassandra/journal/TestParams.java
index edf357a790..d464f00fad 100644
--- a/test/unit/org/apache/cassandra/journal/TestParams.java
+++ b/test/unit/org/apache/cassandra/journal/TestParams.java
@@ -31,6 +31,12 @@ public class TestParams implements Params
return 32 << 20;
}
+ @Override
+ public int compactMaxSegments()
+ {
+ return 16;
+ }
+
@Override
public FailurePolicy failurePolicy()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]