This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit b98954f0acb281101d42f76bad18a2051d114692 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Fri Jul 19 18:01:16 2024 +0200 Add an ability to reconstruct arbitrary epoch state from the log to TCM Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19790. --- .../org/apache/cassandra/metrics/TCMMetrics.java | 2 + src/java/org/apache/cassandra/net/Verb.java | 3 + .../schema/DistributedMetadataLogKeyspace.java | 35 ++++++++ .../cassandra/tcm/AbstractLocalProcessor.java | 2 +- .../cassandra/tcm/AtomicLongBackedProcessor.java | 10 +++ .../cassandra/tcm/ClusterMetadataService.java | 5 ++ .../apache/cassandra/tcm/PaxosBackedProcessor.java | 5 ++ src/java/org/apache/cassandra/tcm/Processor.java | 4 + .../apache/cassandra/tcm/ReconstructLogState.java | 86 ++++++++++++++++++++ .../org/apache/cassandra/tcm/RemoteProcessor.java | 24 ++++++ .../cassandra/tcm/StubClusterMetadataService.java | 6 ++ .../org/apache/cassandra/tcm/log/LocalLog.java | 7 +- .../org/apache/cassandra/tcm/log/LogReader.java | 49 +++++++++++ .../org/apache/cassandra/tcm/log/LogStorage.java | 12 +++ .../cassandra/tcm/migration/GossipProcessor.java | 6 ++ .../test/log/CoordinatorPathTestBase.java | 5 ++ .../distributed/test/log/ReconstructEpochTest.java | 94 ++++++++++++++++++++++ .../distributed/test/log/TestProcessor.java | 6 ++ .../org/apache/cassandra/tcm/log/LocalLogTest.java | 5 ++ 19 files changed, 364 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java b/src/java/org/apache/cassandra/metrics/TCMMetrics.java index 134a1a34e2..68d5db55fe 100644 --- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java @@ -62,6 +62,7 @@ public class TCMMetrics public final Meter progressBarrierCLRelax; public final Meter coordinatorBehindSchema; public final Meter coordinatorBehindPlacements; + public final Meter reconstructLogStateCall; private TCMMetrics() { @@ -113,6 +114,7 @@ public class TCMMetrics coordinatorBehindSchema = Metrics.meter(factory.createMetricName("CoordinatorBehindSchema")); coordinatorBehindPlacements = Metrics.meter(factory.createMetricName("CoordinatorBehindPlacements")); + reconstructLogStateCall = Metrics.meter(factory.createMetricName("ReconstructLogStateCall")); } public void recordCommitFailureLatency(long latency, TimeUnit timeUnit, boolean isRejection) diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 151b59c8df..c4d87139ea 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -130,6 +130,7 @@ import org.apache.cassandra.tcm.Discovery; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.FetchCMSLog; import org.apache.cassandra.tcm.FetchPeerLog; +import org.apache.cassandra.tcm.ReconstructLogState; import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.sequences.DataMovements; import org.apache.cassandra.tcm.serialization.MessageSerializers; @@ -298,6 +299,8 @@ public enum Verb TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_LOG, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), + TCM_RECONSTRUCT_EPOCH_RSP (820, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), + TCM_RECONSTRUCT_EPOCH_REQ (821, P0, rpcTimeout, FETCH_LOG, () -> ReconstructLogState.serializer, () -> ReconstructLogState.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ), INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ), diff --git a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java index 26852475e7..9a3eaaf49d 100644 --- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java +++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java @@ -162,6 +162,20 @@ public final class DistributedMetadataLogKeyspace return (consistentFetch ? serialLogReader : localLogReader).getLogState(since); } + /** + * Reconstructs the log state by returning a _consistent_ base snapshot of a start epoch, and + * a list of transformations between start and end. + * + * TODO: this is a rather expensive operation, and should be use sparingly. If we decide we need to + * rely on reconstructing arbitrary epochs during normal operation, we need to add a caching mechanism + * here. One more alternative is to keep a lazily-initialized AccordTopology table on CMS nodes for a + * number of recent epochs, and keep a node-local cache of this table on other nodes. + */ + public static LogState getLogState(Epoch start, Epoch end) + { + return serialLogReader.getLogState(start, end); + } + public static class DistributedTableLogReader implements LogReader { private final ConsistencyLevel consistencyLevel; @@ -199,6 +213,27 @@ public final class DistributedMetadataLogKeyspace return entryHolder; } + public EntryHolder getEntries(Epoch since, Epoch until) throws IOException + { + // during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the reverse partitioner doesn't support negative keys) + since = since.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : since; + // note that we want all entries with epoch >= since - but since we use a reverse partitioner, we actually + // want all entries where the token is less than token(since) + UntypedResultSet resultSet = execute(String.format("SELECT epoch, kind, transformation, entry_id FROM %s.%s WHERE token(epoch) <= token(?) AND token(epoch) >= token(?)", + SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME), + consistencyLevel, since.getEpoch(), until.getEpoch()); + EntryHolder entryHolder = new EntryHolder(since); + for (UntypedResultSet.Row row : resultSet) + { + long entryId = row.getLong("entry_id"); + Epoch epoch = Epoch.create(row.getLong("epoch")); + Transformation.Kind kind = Transformation.Kind.fromId(row.getInt("kind")); + Transformation transform = kind.fromVersionedBytes(row.getBlob("transformation")); + entryHolder.add(new Entry(new Entry.Id(entryId), epoch, transform)); + } + return entryHolder; + } + @Override public MetadataSnapshots snapshots() { diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 247aa45670..f602e9a410 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -183,7 +183,7 @@ public abstract class AbstractLocalProcessor implements Processor // We can use local log here since we always call this method only if local log is up-to-date: // in case of a successful commit, we apply against latest metadata locally before committing, // and in case of a rejection, we fetch latest entries to verify linearizability. - logState = log.getCommittedEntries(lastKnown); + logState = log.getLocalEntries(lastKnown); } return logState; diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java index df00253433..41efdb245b 100644 --- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java @@ -76,6 +76,11 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor return log.waitForHighestConsecutive(); } + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + return log.getLocalEntries(lowEpoch); + } + public static class InMemoryStorage implements LogStorage { private final List<Entry> entries; @@ -130,6 +135,11 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor { throw new IllegalStateException("We have overridden all callers of this method, it should never be called"); } + + public EntryHolder getEntries(Epoch since, Epoch until) + { + throw new IllegalStateException("We have overridden all callers of this method, it should never be called"); + } } public static class InMemoryMetadataSnapshots implements MetadataSnapshots diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 7b022b04eb..ec740f183c 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -845,6 +845,11 @@ public class ClusterMetadataService return delegate().fetchLogAndWait(waitFor, retryPolicy); } + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + return delegate().reconstruct(lowEpoch, highEpoch, retryPolicy); + } + public String toString() { return "SwitchableProcessor{" + diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index 45b5945cbc..dbaac24041 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -167,6 +167,11 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor throw new ReadTimeoutException(ConsistencyLevel.QUORUM, blockFor - collected.size(), blockFor, false); } + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + return DistributedMetadataLogKeyspace.getLogState(lowEpoch, highEpoch); + } + private static <T> T unwrap(Promise<T> promise) { if (!promise.isDone() || !promise.isSuccess()) diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index e3f12852c2..5f9e015200 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.log.LogState; public interface Processor { @@ -69,5 +70,8 @@ public interface Processor Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), new Retry.Jitter(TCMMetrics.instance.fetchLogRetries))); } + ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy); + + LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy); } diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java new file mode 100644 index 0000000000..f6a60f070a --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.io.IOException; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TCMMetrics; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.utils.FBUtilities; + +public class ReconstructLogState +{ + public static final Serializer serializer = new Serializer(); + + public final Epoch lowerBound; + public final Epoch higherBound; + + public ReconstructLogState(Epoch lowerBound, Epoch higherBound) + { + this.lowerBound = lowerBound; + this.higherBound = higherBound; + } + + static class Serializer implements IVersionedSerializer<ReconstructLogState> + { + + public void serialize(ReconstructLogState t, DataOutputPlus out, int version) throws IOException + { + Epoch.serializer.serialize(t.lowerBound, out); + Epoch.serializer.serialize(t.higherBound, out); + } + + public ReconstructLogState deserialize(DataInputPlus in, int version) throws IOException + { + Epoch lowerBound = Epoch.serializer.deserialize(in); + Epoch higherBound = Epoch.serializer.deserialize(in); + return new ReconstructLogState(lowerBound, higherBound); + } + + public long serializedSize(ReconstructLogState t, int version) + { + return Epoch.serializer.serializedSize(t.lowerBound) + + Epoch.serializer.serializedSize(t.higherBound); + } + } + + public static class Handler implements IVerbHandler<ReconstructLogState> + { + public static final Handler instance = new Handler(); + + public void doVerb(Message<ReconstructLogState> message) throws IOException + { + TCMMetrics.instance.reconstructLogStateCall.mark(); + ReconstructLogState request = message.payload; + + if (!ClusterMetadataService.instance().isCurrentMember(FBUtilities.getBroadcastAddressAndPort())) + throw new NotCMSException("This node is not in the CMS, can't generate a consistent log fetch response to " + message.from()); + + LogState result = DistributedMetadataLogKeyspace.getLogState(request.lowerBound, request.higherBound); + MessagingService.instance().send(message.responseWith(result), message.from()); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index ec3943d5e9..101d5db90d 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -150,6 +150,30 @@ public final class RemoteProcessor implements Processor } } + @Override + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + try + { + Promise<LogState> request = new AsyncPromise<>(); + List<InetAddressAndPort> candidates = new ArrayList<>(log.metadata().fullCMSMembers()); + sendWithCallbackAsync(request, + Verb.TCM_RECONSTRUCT_EPOCH_REQ, + new ReconstructLogState(lowEpoch, highEpoch), + new CandidateIterator(candidates), + new Retry.Backoff(TCMMetrics.instance.fetchLogRetries)); + return request.get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS); + } + catch (InterruptedException e) + { + throw new RuntimeException("Can not reconstruct during shutdown", e); + } + catch (ExecutionException | TimeoutException e) + { + throw new RuntimeException("Could not reconstruct", e); + } + } + public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log) { Promise<LogState> remoteRequest = new AsyncPromise<>(); diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index 2a76375af3..abb3aa2603 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -33,6 +33,7 @@ import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.tcm.Commit.Replicator; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PlacementProvider; @@ -147,6 +148,11 @@ public class StubClusterMetadataService extends ClusterMetadataService { throw new UnsupportedOperationException(); } + + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + throw new UnsupportedOperationException(); + } } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 0307e49048..5f749ca9b5 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -336,11 +336,16 @@ public abstract class LocalLog implements Closeable } } - public LogState getCommittedEntries(Epoch since) + public LogState getLocalEntries(Epoch since) { return storage.getLogState(since); } + public LogState getLocalEntries(Epoch since, Epoch until) + { + return storage.getLogState(since, until); + } + public ClusterMetadata waitForHighestConsecutive() { runOnce(); diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java b/src/java/org/apache/cassandra/tcm/log/LogReader.java index b63ec4ecd6..901416c405 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogReader.java +++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java @@ -27,6 +27,7 @@ import java.util.TreeSet; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; @@ -37,6 +38,7 @@ public interface LogReader * Gets all entries where epoch >= since - could be empty if since is a later epoch than the current highest seen */ EntryHolder getEntries(Epoch since) throws IOException; + EntryHolder getEntries(Epoch since, Epoch until) throws IOException; MetadataSnapshots snapshots(); /** @@ -110,6 +112,53 @@ public interface LogReader } } + default LogState getLogState(Epoch start, Epoch end) + { + try + { + ClusterMetadata closestSnapshot = snapshots().getSnapshotBefore(start); + + // Snapshot could not be found, fetch enough epochs to reconstruct the start metadata + if (closestSnapshot == null) + { + closestSnapshot = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + ImmutableList.Builder<Entry> entries = new ImmutableList.Builder<>(); + EntryHolder entryHolder = getEntries(Epoch.EMPTY, end); + for (Entry entry : entryHolder.entries) + { + if (entry.epoch.isAfter(start)) + entries.add(entry); + else + closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; + } + return new LogState(closestSnapshot, entries.build()); + } + else if (closestSnapshot.epoch.isBefore(start)) + { + ImmutableList.Builder<Entry> entries = new ImmutableList.Builder<>(); + EntryHolder entryHolder = getEntries(closestSnapshot.epoch, end); + for (Entry entry : entryHolder.entries) + { + if (entry.epoch.isAfter(start)) + entries.add(entry); + else + closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; + } + return new LogState(closestSnapshot, entries.build()); + } + else + { + assert closestSnapshot.epoch.isEqualOrAfter(start) : String.format("Got %s, but requested snapshot of %s", closestSnapshot.epoch, start); + EntryHolder entryHolder = getEntries(closestSnapshot.epoch.nextEpoch(), end); + return new LogState(closestSnapshot, ImmutableList.copyOf(entryHolder.entries)); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + class EntryHolder { SortedSet<Entry> entries; diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java b/src/java/org/apache/cassandra/tcm/log/LogStorage.java index fbf60840c2..095f704a37 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java @@ -50,6 +50,12 @@ public interface LogStorage extends LogReader return LogState.EMPTY; } + @Override + public LogState getLogState(Epoch start, Epoch end) + { + return LogState.EMPTY; + } + @Override public LogState getPersistedLogState() { @@ -62,6 +68,12 @@ public interface LogStorage extends LogReader return null; } + @Override + public EntryHolder getEntries(Epoch since, Epoch until) + { + return null; + } + @Override public MetadataSnapshots snapshots() { diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java index beb71feb12..e79d4baf17 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java @@ -25,6 +25,7 @@ import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.log.LogState; public class GossipProcessor implements Processor { @@ -39,4 +40,9 @@ public class GossipProcessor implements Processor { return ClusterMetadata.current(); } + + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + throw new IllegalStateException("Can't reconstruct log state when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 64c4e629e3..59121b0aaa 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -752,6 +752,11 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase log.append(logState); return log.waitForHighestConsecutive(); } + + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + return log.getLocalEntries(lowEpoch, highEpoch); + } }, (a,b) -> {}, false); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java new file mode 100644 index 0000000000..e89502e90c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.metrics.TCMMetrics; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Retry; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.log.LogState; + +public class ReconstructEpochTest extends TestBaseImpl +{ + @Test + public void logReaderTest() throws Exception + { + try (Cluster cluster = init(builder().withNodes(2).start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int primary key)")); + for (int i = 0; i < 30; i++) + { + if (i > 0 && i % 5 == 0) + cluster.get(1).runOnInstance(() -> ClusterMetadataService.instance().triggerSnapshot()); + cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH comment = '" + i + "'")); + } + + cluster.get(1).runOnInstance(() -> { + for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, + new int[]{ 2, 20 }, + new int[]{ 5, 5 }, + new int[]{ 15, 20 }}) + { + int start = cfg[0]; + int end = cfg[1]; + LogState logState = DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), Epoch.create(end)); + Assert.assertEquals(start, logState.baseState.epoch.getEpoch()); + Iterator<Entry> iter = logState.entries.iterator(); + for (int i = start + 1; i <= end; i++) + Assert.assertEquals(i, iter.next().epoch.getEpoch()); + } + }); + + + cluster.get(2).runOnInstance(() -> { + for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, + new int[]{ 2, 20 }, + new int[]{ 5, 5 }, + new int[]{ 15, 20 }}) + { + int start = cfg[0]; + int end = cfg[1]; + LogState logState = ClusterMetadataService.instance() + .processor() + .reconstruct(Epoch.create(start), + Epoch.create(end), + Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + TCMMetrics.instance.commitRetries)); + + Assert.assertEquals(start, logState.baseState.epoch.getEpoch()); + Iterator<Entry> iter = logState.entries.iterator(); + for (int i = start + 1; i <= end; i++) + Assert.assertEquals(i, iter.next().epoch.getEpoch()); + } + }); + } + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java index f5fabfb4ac..6f359af057 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java @@ -32,6 +32,7 @@ import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.Retry; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,11 @@ public class TestProcessor implements Processor return delegate.fetchLogAndWait(waitFor, retryPolicy); } + public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + { + return delegate.reconstruct(lowEpoch, highEpoch, retryPolicy); + } + protected void waitIfPaused() { if (isPaused()) diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java index fbdafb131d..20bb5c79b0 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java @@ -145,6 +145,11 @@ public class LocalLogTest return new EntryHolder(since); } + public EntryHolder getEntries(Epoch since, Epoch until) throws IOException + { + return new EntryHolder(since); + } + @Override public MetadataSnapshots snapshots() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org