This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b98954f0acb281101d42f76bad18a2051d114692
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Fri Jul 19 18:01:16 2024 +0200

    Add an ability to reconstruct arbitrary epoch state from the log to TCM
    
    Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19790.
---
 .../org/apache/cassandra/metrics/TCMMetrics.java   |  2 +
 src/java/org/apache/cassandra/net/Verb.java        |  3 +
 .../schema/DistributedMetadataLogKeyspace.java     | 35 ++++++++
 .../cassandra/tcm/AbstractLocalProcessor.java      |  2 +-
 .../cassandra/tcm/AtomicLongBackedProcessor.java   | 10 +++
 .../cassandra/tcm/ClusterMetadataService.java      |  5 ++
 .../apache/cassandra/tcm/PaxosBackedProcessor.java |  5 ++
 src/java/org/apache/cassandra/tcm/Processor.java   |  4 +
 .../apache/cassandra/tcm/ReconstructLogState.java  | 86 ++++++++++++++++++++
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 24 ++++++
 .../cassandra/tcm/StubClusterMetadataService.java  |  6 ++
 .../org/apache/cassandra/tcm/log/LocalLog.java     |  7 +-
 .../org/apache/cassandra/tcm/log/LogReader.java    | 49 +++++++++++
 .../org/apache/cassandra/tcm/log/LogStorage.java   | 12 +++
 .../cassandra/tcm/migration/GossipProcessor.java   |  6 ++
 .../test/log/CoordinatorPathTestBase.java          |  5 ++
 .../distributed/test/log/ReconstructEpochTest.java | 94 ++++++++++++++++++++++
 .../distributed/test/log/TestProcessor.java        |  6 ++
 .../org/apache/cassandra/tcm/log/LocalLogTest.java |  5 ++
 19 files changed, 364 insertions(+), 2 deletions(-)

diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java 
b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
index 134a1a34e2..68d5db55fe 100644
--- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
@@ -62,6 +62,7 @@ public class TCMMetrics
     public final Meter progressBarrierCLRelax;
     public final Meter coordinatorBehindSchema;
     public final Meter coordinatorBehindPlacements;
+    public final Meter reconstructLogStateCall;
 
     private TCMMetrics()
     {
@@ -113,6 +114,7 @@ public class TCMMetrics
 
         coordinatorBehindSchema = 
Metrics.meter(factory.createMetricName("CoordinatorBehindSchema"));
         coordinatorBehindPlacements = 
Metrics.meter(factory.createMetricName("CoordinatorBehindPlacements"));
+        reconstructLogStateCall = 
Metrics.meter(factory.createMetricName("ReconstructLogStateCall"));
     }
 
     public void recordCommitFailureLatency(long latency, TimeUnit timeUnit, 
boolean isRejection)
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 151b59c8df..c4d87139ea 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -130,6 +130,7 @@ import org.apache.cassandra.tcm.Discovery;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.FetchCMSLog;
 import org.apache.cassandra.tcm.FetchPeerLog;
+import org.apache.cassandra.tcm.ReconstructLogState;
 import org.apache.cassandra.tcm.migration.Election;
 import org.apache.cassandra.tcm.sequences.DataMovements;
 import org.apache.cassandra.tcm.serialization.MessageSerializers;
@@ -298,6 +299,8 @@ public enum Verb
     TCM_DISCOVER_REQ       (813, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP       ),
     TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             RESPONSE_HANDLER            
                     ),
     TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout,      FETCH_LOG,            () 
-> FetchPeerLog.serializer,                      () -> 
FetchPeerLog.Handler.instance,        TCM_FETCH_PEER_LOG_RSP ),
+    TCM_RECONSTRUCT_EPOCH_RSP (820, P0, rpcTimeout,   FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_RECONSTRUCT_EPOCH_REQ (821, P0, rpcTimeout,   FETCH_LOG,            () 
-> ReconstructLogState.serializer,               () -> 
ReconstructLogState.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),
 
     INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> 
NoPayload.serializer,             RESPONSE_HANDLER                              
    ),
     INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> 
DataMovement.serializer,          () -> DataMovementVerbHandler.instance, 
INITIATE_DATA_MOVEMENTS_RSP ),
diff --git 
a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java 
b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
index 26852475e7..9a3eaaf49d 100644
--- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
@@ -162,6 +162,20 @@ public final class DistributedMetadataLogKeyspace
         return (consistentFetch ? serialLogReader : 
localLogReader).getLogState(since);
     }
 
+    /**
+     * Reconstructs the log state by returning a _consistent_ base snapshot of 
a start epoch, and
+     * a list of transformations between start and end.
+     *
+     * TODO: this is a rather expensive operation, and should be use 
sparingly. If we decide we need to
+     *  rely on reconstructing arbitrary epochs during normal operation, we 
need to add a caching mechanism
+     *  here. One more alternative is to keep a lazily-initialized 
AccordTopology table on CMS nodes for a
+     *  number of recent epochs, and keep a node-local cache of this table on 
other nodes.
+     */
+    public static LogState getLogState(Epoch start, Epoch end)
+    {
+        return serialLogReader.getLogState(start, end);
+    }
+
     public static class DistributedTableLogReader implements LogReader
     {
         private final ConsistencyLevel consistencyLevel;
@@ -199,6 +213,27 @@ public final class DistributedMetadataLogKeyspace
             return entryHolder;
         }
 
+        public EntryHolder getEntries(Epoch since, Epoch until) throws 
IOException
+        {
+            // during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and 
the reverse partitioner doesn't support negative keys)
+            since = since.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : since;
+            // note that we want all entries with epoch >= since - but since 
we use a reverse partitioner, we actually
+            // want all entries where the token is less than token(since)
+            UntypedResultSet resultSet = execute(String.format("SELECT epoch, 
kind, transformation, entry_id FROM %s.%s WHERE token(epoch) <= token(?) AND 
token(epoch) >= token(?)",
+                                                               
SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME),
+                                                 consistencyLevel, 
since.getEpoch(), until.getEpoch());
+            EntryHolder entryHolder = new EntryHolder(since);
+            for (UntypedResultSet.Row row : resultSet)
+            {
+                long entryId = row.getLong("entry_id");
+                Epoch epoch = Epoch.create(row.getLong("epoch"));
+                Transformation.Kind kind = 
Transformation.Kind.fromId(row.getInt("kind"));
+                Transformation transform = 
kind.fromVersionedBytes(row.getBlob("transformation"));
+                entryHolder.add(new Entry(new Entry.Id(entryId), epoch, 
transform));
+            }
+            return entryHolder;
+        }
+
         @Override
         public MetadataSnapshots snapshots()
         {
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 247aa45670..f602e9a410 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -183,7 +183,7 @@ public abstract class AbstractLocalProcessor implements 
Processor
             // We can use local log here since we always call this method only 
if local log is up-to-date:
             // in case of a successful commit, we apply against latest 
metadata locally before committing,
             // and in case of a rejection, we fetch latest entries to verify 
linearizability.
-            logState = log.getCommittedEntries(lastKnown);
+            logState = log.getLocalEntries(lastKnown);
         }
 
         return logState;
diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
index df00253433..41efdb245b 100644
--- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
@@ -76,6 +76,11 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
         return log.waitForHighestConsecutive();
     }
 
+    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    {
+        return log.getLocalEntries(lowEpoch);
+    }
+
     public static class InMemoryStorage implements LogStorage
     {
         private final List<Entry> entries;
@@ -130,6 +135,11 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
         {
             throw new IllegalStateException("We have overridden all callers of 
this method, it should never be called");
         }
+
+        public EntryHolder getEntries(Epoch since, Epoch until)
+        {
+            throw new IllegalStateException("We have overridden all callers of 
this method, it should never be called");
+        }
     }
 
     public static class InMemoryMetadataSnapshots implements MetadataSnapshots
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 7b022b04eb..ec740f183c 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -845,6 +845,11 @@ public class ClusterMetadataService
             return delegate().fetchLogAndWait(waitFor, retryPolicy);
         }
 
+        public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+        {
+            return delegate().reconstruct(lowEpoch, highEpoch, retryPolicy);
+        }
+
         public String toString()
         {
             return "SwitchableProcessor{" +
diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
index 45b5945cbc..dbaac24041 100644
--- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
@@ -167,6 +167,11 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
         throw new ReadTimeoutException(ConsistencyLevel.QUORUM, blockFor - 
collected.size(), blockFor, false);
     }
 
+    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    {
+        return DistributedMetadataLogKeyspace.getLogState(lowEpoch, highEpoch);
+    }
+
     private static <T> T unwrap(Promise<T> promise)
     {
         if (!promise.isDone() || !promise.isSuccess())
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
index e3f12852c2..5f9e015200 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.log.LogState;
 
 public interface Processor
 {
@@ -69,5 +70,8 @@ public interface Processor
                                
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
                                                     new 
Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
     }
+
     ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy);
+
+    LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline 
retryPolicy);
 }
diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java 
b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
new file mode 100644
index 0000000000..f6a60f070a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.tcm.log.LogState;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ReconstructLogState
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final Epoch lowerBound;
+    public final Epoch higherBound;
+
+    public ReconstructLogState(Epoch lowerBound, Epoch higherBound)
+    {
+        this.lowerBound = lowerBound;
+        this.higherBound = higherBound;
+    }
+
+    static class Serializer implements 
IVersionedSerializer<ReconstructLogState>
+    {
+
+        public void serialize(ReconstructLogState t, DataOutputPlus out, int 
version) throws IOException
+        {
+            Epoch.serializer.serialize(t.lowerBound, out);
+            Epoch.serializer.serialize(t.higherBound, out);
+        }
+
+        public ReconstructLogState deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            Epoch lowerBound = Epoch.serializer.deserialize(in);
+            Epoch higherBound = Epoch.serializer.deserialize(in);
+            return new ReconstructLogState(lowerBound, higherBound);
+        }
+
+        public long serializedSize(ReconstructLogState t, int version)
+        {
+            return Epoch.serializer.serializedSize(t.lowerBound) +
+                   Epoch.serializer.serializedSize(t.higherBound);
+        }
+    }
+
+    public static class Handler implements IVerbHandler<ReconstructLogState>
+    {
+        public static final Handler instance = new Handler();
+
+        public void doVerb(Message<ReconstructLogState> message) throws 
IOException
+        {
+            TCMMetrics.instance.reconstructLogStateCall.mark();
+            ReconstructLogState request = message.payload;
+
+            if 
(!ClusterMetadataService.instance().isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))
+                throw new NotCMSException("This node is not in the CMS, can't 
generate a consistent log fetch response to " + message.from());
+
+            LogState result = 
DistributedMetadataLogKeyspace.getLogState(request.lowerBound, 
request.higherBound);
+            MessagingService.instance().send(message.responseWith(result), 
message.from());
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index ec3943d5e9..101d5db90d 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -150,6 +150,30 @@ public final class RemoteProcessor implements Processor
         }
     }
 
+    @Override
+    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    {
+        try
+        {
+            Promise<LogState> request = new AsyncPromise<>();
+            List<InetAddressAndPort> candidates = new 
ArrayList<>(log.metadata().fullCMSMembers());
+            sendWithCallbackAsync(request,
+                                  Verb.TCM_RECONSTRUCT_EPOCH_REQ,
+                                  new ReconstructLogState(lowEpoch, highEpoch),
+                                  new CandidateIterator(candidates),
+                                  new 
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+            return request.get(retryPolicy.remainingNanos(), 
TimeUnit.NANOSECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Can not reconstruct during shutdown", 
e);
+        }
+        catch (ExecutionException | TimeoutException e)
+        {
+            throw new RuntimeException("Could not reconstruct", e);
+        }
+    }
+
     public static ClusterMetadata fetchLogAndWait(CandidateIterator 
candidateIterator, LocalLog log)
     {
         Promise<LogState> remoteRequest = new AsyncPromise<>();
diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
index 2a76375af3..abb3aa2603 100644
--- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
@@ -33,6 +33,7 @@ import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.tcm.Commit.Replicator;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
+import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.tcm.membership.Directory;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.PlacementProvider;
@@ -147,6 +148,11 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
         {
             throw new UnsupportedOperationException();
         }
+
+        public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
 
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0307e49048..5f749ca9b5 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -336,11 +336,16 @@ public abstract class LocalLog implements Closeable
         }
     }
 
-    public LogState getCommittedEntries(Epoch since)
+    public LogState getLocalEntries(Epoch since)
     {
         return storage.getLogState(since);
     }
 
+    public LogState getLocalEntries(Epoch since, Epoch until)
+    {
+        return storage.getLogState(since, until);
+    }
+
     public ClusterMetadata waitForHighestConsecutive()
     {
         runOnce();
diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java 
b/src/java/org/apache/cassandra/tcm/log/LogReader.java
index b63ec4ecd6..901416c405 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogReader.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java
@@ -27,6 +27,7 @@ import java.util.TreeSet;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.MetadataSnapshots;
@@ -37,6 +38,7 @@ public interface LogReader
      * Gets all entries where epoch >= since - could be empty if since is a 
later epoch than the current highest seen
      */
     EntryHolder getEntries(Epoch since) throws IOException;
+    EntryHolder getEntries(Epoch since, Epoch until) throws IOException;
     MetadataSnapshots snapshots();
 
     /**
@@ -110,6 +112,53 @@ public interface LogReader
         }
     }
 
+    default LogState getLogState(Epoch start, Epoch end)
+    {
+        try
+        {
+            ClusterMetadata closestSnapshot = 
snapshots().getSnapshotBefore(start);
+
+            // Snapshot could not be found, fetch enough epochs to reconstruct 
the start metadata
+            if (closestSnapshot == null)
+            {
+                closestSnapshot = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+                ImmutableList.Builder<Entry> entries = new 
ImmutableList.Builder<>();
+                EntryHolder entryHolder = getEntries(Epoch.EMPTY, end);
+                for (Entry entry : entryHolder.entries)
+                {
+                    if (entry.epoch.isAfter(start))
+                        entries.add(entry);
+                    else
+                        closestSnapshot = 
entry.transform.execute(closestSnapshot).success().metadata;
+                }
+                return new LogState(closestSnapshot, entries.build());
+            }
+            else if (closestSnapshot.epoch.isBefore(start))
+            {
+                ImmutableList.Builder<Entry> entries = new 
ImmutableList.Builder<>();
+                EntryHolder entryHolder = getEntries(closestSnapshot.epoch, 
end);
+                for (Entry entry : entryHolder.entries)
+                {
+                    if (entry.epoch.isAfter(start))
+                        entries.add(entry);
+                    else
+                        closestSnapshot = 
entry.transform.execute(closestSnapshot).success().metadata;
+                }
+                return new LogState(closestSnapshot, entries.build());
+            }
+            else
+            {
+                assert closestSnapshot.epoch.isEqualOrAfter(start) : 
String.format("Got %s, but requested snapshot of %s", closestSnapshot.epoch, 
start);
+                EntryHolder entryHolder = 
getEntries(closestSnapshot.epoch.nextEpoch(), end);
+                return new LogState(closestSnapshot, 
ImmutableList.copyOf(entryHolder.entries));
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     class EntryHolder
     {
         SortedSet<Entry> entries;
diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java 
b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
index fbf60840c2..095f704a37 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
@@ -50,6 +50,12 @@ public interface LogStorage extends LogReader
             return LogState.EMPTY;
         }
 
+        @Override
+        public LogState getLogState(Epoch start, Epoch end)
+        {
+            return LogState.EMPTY;
+        }
+
         @Override
         public LogState getPersistedLogState()
         {
@@ -62,6 +68,12 @@ public interface LogStorage extends LogReader
             return null;
         }
 
+        @Override
+        public EntryHolder getEntries(Epoch since, Epoch until)
+        {
+            return null;
+        }
+
         @Override
         public MetadataSnapshots snapshots()
         {
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
index beb71feb12..e79d4baf17 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.log.LogState;
 
 public class GossipProcessor implements Processor
 {
@@ -39,4 +40,9 @@ public class GossipProcessor implements Processor
     {
         return ClusterMetadata.current();
     }
+
+    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    {
+        throw new IllegalStateException("Can't reconstruct log state when 
running in gossip mode. Enable the ClusterMetadataService with `nodetool 
addtocms`.");
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
index 64c4e629e3..59121b0aaa 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
@@ -752,6 +752,11 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                                log.append(logState);
                                                return 
log.waitForHighestConsecutive();
                                            }
+
+                                           public LogState reconstruct(Epoch 
lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy)
+                                           {
+                                               return 
log.getLocalEntries(lowEpoch, highEpoch);
+                                           }
                                        },
                                        (a,b) -> {},
                                        false);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
new file mode 100644
index 0000000000..e89502e90c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Retry;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.log.LogState;
+
+public class ReconstructEpochTest extends TestBaseImpl
+{
+    @Test
+    public void logReaderTest() throws Exception
+    {
+        try (Cluster cluster = init(builder().withNodes(2).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int 
primary key)"));
+            for (int i = 0; i < 30; i++)
+            {
+                if (i > 0 && i % 5 == 0)
+                    cluster.get(1).runOnInstance(() -> 
ClusterMetadataService.instance().triggerSnapshot());
+                cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH 
comment = '" + i + "'"));
+            }
+
+            cluster.get(1).runOnInstance(() -> {
+                for (int[] cfg : new int[][]{ new int[]{ 6, 9 },
+                                              new int[]{ 2, 20 },
+                                              new int[]{ 5, 5 },
+                                              new int[]{ 15, 20 }})
+                {
+                    int start = cfg[0];
+                    int end = cfg[1];
+                    LogState logState = 
DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), 
Epoch.create(end));
+                    Assert.assertEquals(start, 
logState.baseState.epoch.getEpoch());
+                    Iterator<Entry> iter = logState.entries.iterator();
+                    for (int i = start + 1; i <= end; i++)
+                        Assert.assertEquals(i, iter.next().epoch.getEpoch());
+                }
+            });
+
+
+            cluster.get(2).runOnInstance(() -> {
+                for (int[] cfg : new int[][]{ new int[]{ 6, 9 },
+                                              new int[]{ 2, 20 },
+                                              new int[]{ 5, 5 },
+                                              new int[]{ 15, 20 }})
+                {
+                    int start = cfg[0];
+                    int end = cfg[1];
+                    LogState logState = ClusterMetadataService.instance()
+                                                              .processor()
+                                                              
.reconstruct(Epoch.create(start),
+                                                                           
Epoch.create(end),
+                                                                           
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
                             TCMMetrics.instance.commitRetries));
+
+                    Assert.assertEquals(start, 
logState.baseState.epoch.getEpoch());
+                    Iterator<Entry> iter = logState.entries.iterator();
+                    for (int i = start + 1; i <= end; i++)
+                        Assert.assertEquals(i, iter.next().epoch.getEpoch());
+                }
+            });
+        }
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
index f5fabfb4ac..6f359af057 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.tcm.Processor;
 import org.apache.cassandra.tcm.Retry;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,11 @@ public class TestProcessor implements Processor
         return delegate.fetchLogAndWait(waitFor, retryPolicy);
     }
 
+    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    {
+        return delegate.reconstruct(lowEpoch, highEpoch, retryPolicy);
+    }
+
     protected void waitIfPaused()
     {
         if (isPaused())
diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java 
b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
index fbdafb131d..20bb5c79b0 100644
--- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
+++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java
@@ -145,6 +145,11 @@ public class LocalLogTest
                 return new EntryHolder(since);
             }
 
+            public EntryHolder getEntries(Epoch since, Epoch until) throws 
IOException
+            {
+                return new EntryHolder(since);
+            }
+
             @Override
             public MetadataSnapshots snapshots()
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to