belliottsmith commented on code in PR #4738:
URL: https://github.com/apache/cassandra/pull/4738#discussion_r3274137790


##########
src/java/org/apache/cassandra/service/accord/journal/AccordJournal.java:
##########
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord.journal;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.agrona.collections.Long2LongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.impl.CommandChange;
+import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.CommandStores;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.DurableBefore;
+import accord.local.Node;
+import accord.local.RedundantBefore;
+import accord.primitives.EpochSupplier;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.PersistentField;
+
+import org.apache.cassandra.config.AccordSpec.JournalSpec;
+import org.apache.cassandra.config.AccordSpec.JournalSpec.ReplayMode;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Compactor;
+import org.apache.cassandra.journal.Descriptor;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.RecordConsumer;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.Segments;
+import org.apache.cassandra.journal.ValueSerializer;
+import org.apache.cassandra.service.accord.AccordKeyspace;
+import org.apache.cassandra.service.accord.JournalKey;
+import org.apache.cassandra.service.accord.journal.Merger.KeepFirst;
+import 
org.apache.cassandra.service.accord.journal.RangeSearcher.NoopJournalRangeSearcher;
+import org.apache.cassandra.service.accord.serializers.Version;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static accord.api.Journal.Load.MINIMAL;
+import static accord.api.Journal.Load.MINIMAL_WITH_DEPS;
+import static accord.local.Cleanup.Input.FULL;
+import static 
org.apache.cassandra.config.AccordSpec.RangeIndexMode.journal_sai;
+import static org.apache.cassandra.config.DatabaseDescriptor.getAccord;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordJournalDirectory;
+import static org.apache.cassandra.service.accord.JournalKey.Type.COMMAND_DIFF;
+import static 
org.apache.cassandra.service.accord.journal.ReplayMarkers.safeStopMarker;
+import static 
org.apache.cassandra.service.accord.journal.ReplayMarkers.startMarker;
+import static 
org.apache.cassandra.service.accord.journal.ReplayMarkers.writeMarker;
+import static 
org.apache.cassandra.service.accord.journal.TopologyRecord.newTopology;
+
+public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordJournal.class);
+
+    @VisibleForTesting
+    protected final Journal<JournalKey, Object> segments;
+    protected final ColumnFamilyStore table;
+    @VisibleForTesting
+    protected final @Nullable RangeSearchManager rangeSearch;
+    protected final OpOrder readOrder;
+    private final Params params;
+
+    public AccordJournal(Params params)
+    {
+        this(params, new File(getAccordJournalDirectory()), 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL));
+    }
+
+    @VisibleForTesting
+    public AccordJournal(Params params, File directory, ColumnFamilyStore 
table)
+    {
+        Version userVersion = Version.fromVersion(params.userVersion());
+        this.rangeSearch = RangeSearchManager.ifEnabled(table);
+        this.table = table;
+        this.readOrder = table.readOrdering;
+        this.params = params;
+        // initialise journal last because we call a self method to initialise 
its compactor
+        this.segments = new Journal<>("AccordJournal", directory, params, 
JournalKey.SUPPORT,
+                                      new ValueSerializer.Unsupported<>(),
+                                      compactor(table, userVersion),
+                                      table.readOrdering);
+    }
+
+    @Override
+    public void open(Node node)
+    {
+        segments.open();
+    }
+
+    public void start(Node node)
+    {
+        if (rangeSearch != null)
+            rangeSearch.start();
+
+        long maxTableDescriptor = maxTableDescriptor();
+        segments.start(maxTableDescriptor);
+    }
+
+    public Descriptor stop()
+    {
+        return segments.stop();
+    }
+
+    public void close()
+    {
+        segments.close();
+    }
+
+    public boolean awaitTerminationUntil(long deadlineNanos) throws 
InterruptedException
+    {
+        try
+        {
+            segments.awaitTerminationUntil(deadlineNanos);
+            return true;
+        }
+        catch (TimeoutException e)
+        {
+            return false;
+        }
+    }
+
+    @Override
+    public void saveCommand(int commandStoreId, CommandUpdate update, 
@Nullable Runnable onFlush)
+    {
+        CommandChangeWriter change = CommandChangeWriter.make(update.before, 
update.after);
+        if (change == null)
+        {
+            if (onFlush != null)
+                onFlush.run();
+            return;
+        }
+
+        JournalKey key = new JournalKey(update.txnId, COMMAND_DIFF, 
commandStoreId);
+        RecordPointer pointer = segments.asyncWrite(key, change);
+        if (rangeSearch != null)
+            onFlush = merge(onFlush, rangeSearch.maybeIndex(key, pointer, 
change));
+        if (onFlush != null)
+            segments.onDurable(pointer, onFlush);
+    }
+
+    <T> void append(JournalKey key, T write, Runnable onFlush)
+    {
+        RecordPointer pointer = appendInternal(key, write);
+        if (onFlush != null)
+            segments.onDurable(pointer, onFlush);
+    }
+
+    private <T> RecordPointer appendInternal(JournalKey key, T write)
+    {
+        MergeSerializer<T, ?, ?> serializer = (MergeSerializer<T, ?, ?>) 
key.type.serializer;
+        return segments.asyncWrite(key, (out, userVersion) -> 
serializer.serialize(key, write, out, Version.fromVersion(userVersion)));
+    }
+
+
+    public long maxTableDescriptor()
+    {
+        return table.getTracker().getView().liveSSTables()
+                    .stream()
+                    .filter(sst -> sst.getSSTableMetadata().totalRows > 0)
+                    .map(sst -> 
LongType.instance.compose(sst.getSSTableMetadata().coveredClustering.end().bufferAt(0)))
+                    .max(Long::compare).orElse(0L);
+    }
+
+    public long maxDescriptor()
+    {
+        return Math.max(segments.maxDescriptor(), maxTableDescriptor());
+    }
+
+    public Params configuration()
+    {
+        return params;
+    }
+
+    @Override
+    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        CommandChanges builder = load(commandStoreId, txnId);
+        builder.maybeCleanup(true, FULL, redundantBefore, durableBefore);
+        return builder.construct(redundantBefore);
+    }
+
+    @Override
+    public List<DebugEntry> debugCommand(int commandStoreId, TxnId txnId)
+    {
+        JournalKey key = new JournalKey(txnId, COMMAND_DIFF, commandStoreId);
+        List<DebugEntry> result = new ArrayList<>();
+        readAll(key, (long segment, int position, JournalKey k, ByteBuffer 
buffer, int userVersion) -> {
+            CommandChanges builder = new CommandChanges(txnId);
+            new 
RecordConsumerAdapter<>(builder::deserializeNext).accept(segment, position, k, 
buffer, userVersion);
+            result.add(new DebugEntry(segment, position, builder));
+        });
+        return result;
+    }
+
+    @Override
+    public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        CommandChanges builder = 
CommandChanges.cleanupAndFilter(loadDiffs(commandStoreId, txnId, MINIMAL), 
redundantBefore, durableBefore);
+        return builder == null ? null : builder.asMinimal();
+    }
+
+    @Override
+    public Command.MinimalWithDeps loadMinimalWithDeps(int commandStoreId, 
TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        CommandChanges builder = 
CommandChanges.cleanupAndFilter(loadDiffs(commandStoreId, txnId, 
MINIMAL_WITH_DEPS), redundantBefore, durableBefore);
+        return builder == null ? null : builder.asMinimalWithDeps();
+    }
+
+    private CommandChanges loadDiffs(int commandStoreId, TxnId txnId, Load 
load)
+    {
+        JournalKey key = new JournalKey(txnId, COMMAND_DIFF, commandStoreId);
+        CommandChanges builder = new CommandChanges(txnId, load);
+        readAll(key, builder::deserializeNext);
+        return builder;
+    }
+
+    @VisibleForTesting
+    public CommandChanges load(int commandStoreId, TxnId txnId)
+    {
+        return loadDiffs(commandStoreId, txnId, Load.ALL);
+    }
+
+    @Override
+    public List<TopologyUpdate> loadTopologies()
+    {
+        List<accord.api.Journal.TopologyUpdate> images = new ArrayList<>();
+        try (CloseableIterator<accord.api.Journal.TopologyUpdate> iter = new 
CloseableIterator<>()
+        {
+            final CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
keyIterator(TopologyRecord.journalKey(0L),
+                                                                               
     TopologyRecord.journalKey(Timestamp.MAX_EPOCH),
+                                                                               
     true, 0);
+            TopologyRecord.TopologyImage prev = null;
+
+            @Override
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            @Override
+            public accord.api.Journal.TopologyUpdate next()
+            {
+                Journal.KeyRefs<JournalKey> ref = iter.next();
+                MergeSerializers.TopologyMerger reader = readAll(ref.key());
+                if (reader.read().kind() == TopologyRecord.Kind.Repeat)
+                {
+                    if (prev == null)
+                    {
+                        logger.error("Encountered TopologyImage Repeat record 
for epoch {}, but no prior image record was found", ref.key().id.epoch());
+                        return null;
+                    }
+                    prev = 
reader.read().asImage(Invariants.nonNull(prev.getUpdate()));
+                }
+                else prev = reader.read();
+
+                return new 
accord.api.Journal.TopologyUpdate(prev.getUpdate().commandStores,
+                                                             
prev.getUpdate().global);
+            }
+
+            @Override
+            public void close()
+            {
+                iter.close();
+            }
+        })
+        {
+            accord.api.Journal.TopologyUpdate prev = null;
+            while (iter.hasNext())
+            {
+                accord.api.Journal.TopologyUpdate next = iter.next();
+                if (next == null)
+                    continue;
+
+                Invariants.require(prev == null || next.global.epoch() > 
prev.global.epoch());
+                // Due to partial compaction, we can clean up only some of the 
old epochs, creating gaps. We skip these epochs here.
+                if (prev != null && next.global.epoch() > prev.global.epoch() 
+ 1)
+                    images.clear();
+
+                images.add(next);
+                prev = next;
+            }
+        }
+        return images;
+    }
+
+    @Override
+    public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
+    {
+        append(TopologyRecord.journalKey(topologyUpdate.global.epoch()),
+               newTopology(topologyUpdate),
+               onFlush);
+    }
+
+    @Override
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        KeepFirst<RedundantBefore> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
+        return accumulator.get();
+    }
+
+    @Override
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+    {
+        KeepFirst<NavigableMap<TxnId, Ranges>> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId));
+        return accumulator.get();
+    }
+
+    @Override
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        KeepFirst<NavigableMap<Timestamp, Ranges>> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, commandStoreId));
+        return accumulator.get();
+    }
+
+    @Override
+    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
+    {
+        KeepFirst<RangesForEpoch> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
+        return accumulator.get();
+    }
+
+    @Override
+    public PersistentField.Persister<DurableBefore, DurableBefore> 
durableBeforePersister()
+    {
+        return new DurableBeforePersister(this);
+    }
+
+    @Override
+    public void saveStoreState(int commandStoreId, FieldUpdates fieldUpdates, 
Runnable onFlush)
+    {
+        RecordPointer pointer = null;
+        if (fieldUpdates.newRedundantBefore != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.REDUNDANT_BEFORE, commandStoreId), 
fieldUpdates.newRedundantBefore);
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId), 
fieldUpdates.newBootstrapBeganAt);
+        if (fieldUpdates.newSafeToRead != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.SAFE_TO_READ, commandStoreId), fieldUpdates.newSafeToRead);
+        if (fieldUpdates.newRangesForEpoch != null)
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId), 
fieldUpdates.newRangesForEpoch);
+
+        if (onFlush == null)
+            return;
+
+        if (pointer != null)
+            segments.onDurable(pointer, onFlush);
+        else
+            onFlush.run();
+    }
+
+    public <BUILDER extends Merger> BUILDER readAll(JournalKey key)
+    {
+        Invariants.require(segments.isReadable());
+        BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
+        builder.reset(key);
+        // TODO (expected): this can be further improved to avoid allocating 
lambdas
+        MergeSerializer<?, ? super BUILDER, ? extends BUILDER> serializer = 
(MergeSerializer<?, ? super BUILDER, ? extends BUILDER>) key.type.serializer;
+        // TODO (expected): for those where we store an image, read only the 
first entry we find in DESC order
+        readAll(key, (in, userVersion) -> serializer.deserialize(key, builder, 
in, userVersion));
+        return builder;
+    }
+
+    public <BUILDER extends Merger> BUILDER readLast(JournalKey key)
+    {
+        Invariants.require(segments.isReadable());
+        BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
+        builder.reset(key);
+        // TODO (expected): this can be further improved to avoid allocating 
lambdas
+        MergeSerializer<?, ? super BUILDER, ? extends BUILDER> serializer = 
(MergeSerializer<?, ? super BUILDER, ? extends BUILDER>) key.type.serializer;
+        readLast(key, (in, userVersion) -> serializer.deserialize(key, 
builder, in, userVersion));
+        return builder;
+    }
+
+    public void forEachEntry(JournalKey key, Reader reader)
+    {
+        readAll(key, reader);
+    }
+
+    public interface Reader
+    {
+        void read(DataInputPlus input, Version userVersion) throws IOException;
+
+        default void read(ByteBuffer buffer, Version userVersion)
+        {
+            try (DataInputBuffer in = new DataInputBuffer(buffer, false))
+            {
+                read(in, userVersion);
+            }
+            catch (IOException e)
+            {
+                // can only throw if serializer is buggy or bytes got corrupted
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    static class RecordConsumerAdapter<K> implements RecordConsumer<K>
+    {
+        protected final Reader reader;
+        private long prevSegment = Long.MAX_VALUE;
+        private long prevPosition = Long.MAX_VALUE;
+
+        RecordConsumerAdapter(Reader reader)
+        {
+            this.reader = reader;
+        }
+
+        @Override
+        public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
+        {
+            Invariants.require(segment <= prevSegment,
+                               "Records should always be iterated over in a 
reverse order, but segment %d was seen after %d while reading %s", segment, 
prevSegment, key);
+            Invariants.require(segment != prevSegment || position < 
prevPosition,
+                               "Records should always be iterated over in a 
reverse order, but position %d was seen after %d for segment %d while reading 
%s", position, prevPosition, segment, key);
+            reader.read(buffer, Version.fromVersion(userVersion));
+            prevSegment = segment;
+            prevPosition = position;
+        }
+    }
+
+    /**
+     * Perform a read from Journal table, followed by the reads from all 
journal segments.
+     * <p>
+     * When reading from journal segments, skip descriptors that were read 
from the table.
+     */
+    public void readAll(JournalKey key, Reader reader)
+    {
+        readAll(key, new RecordConsumerAdapter<>(reader));
+    }
+
+    public void readAll(JournalKey key, RecordConsumer<JournalKey> reader)
+    {
+        try (OpOrder.Group readOrder = table.readOrdering.start())
+        {
+            // SELECT segments first, to avoid missing segments due to races 
compacting segment->sstable
+            Segments<JournalKey, Object> segments = this.segments.segments();
+            try (TableRecordIterator table = 
TableRecordIterator.all(this.table, key, readOrder))
+            {
+                boolean hasTableData = table.advance();
+                long minSegment = hasTableData ? table.segment : 
Long.MIN_VALUE;
+                // First, read all journal entries newer than anything flushed 
into sstables
+                Journal.readAll(key, (segment, position, key1, buffer, 
userVersion) -> {
+                    if (segment > minSegment)
+                        reader.accept(segment, position, key1, buffer, 
userVersion);
+                }, readOrder, segments);
+
+                // Then, read SSTables
+                while (hasTableData)
+                {
+                    reader.accept(table.segment, table.offset, key, 
table.value, table.userVersion);
+                    hasTableData = table.advance();
+                }
+            }
+        }
+    }
+
+    public void readLast(JournalKey key, Reader reader)
+    {
+        readLast(key, new RecordConsumerAdapter<>(reader));
+    }
+
+    public void readLast(JournalKey key, RecordConsumer<JournalKey> reader)
+    {
+        try (OpOrder.Group readOrder = table.readOrdering.start())
+        {
+            Segments<JournalKey, Object> segments = this.segments.segments();
+            try (TableRecordIterator table = 
TableRecordIterator.all(this.table, key, readOrder))
+            {
+                boolean hasTableData = table.advance();
+                long minSegment = hasTableData ? table.segment : 
Long.MIN_VALUE;
+
+                class JournalReader implements RecordConsumer<JournalKey>
+                {
+                    boolean read;
+                    @Override
+                    public void accept(long segment, int position, JournalKey 
key, ByteBuffer buffer, int userVersion)
+                    {
+                        if (segment > minSegment)
+                        {
+                            reader.accept(segment, position, key, buffer, 
userVersion);
+                            read = true;
+                        }
+                    }
+                }
+
+                // First, read all journal entries newer than anything flushed 
into sstables
+                JournalReader journalReader = new JournalReader();
+                Journal.readLast(key, journalReader, readOrder, segments);
+
+                // Then, read SSTables, if we haven't found a record already
+                if (hasTableData && !journalReader.read)
+                    reader.accept(table.segment, table.offset, key, 
table.value, table.userVersion);
+            }
+        }
+    }
+
+    public CloseableIterator<Journal.KeyRefs<JournalKey>> 
keyIterator(@Nullable JournalKey min, @Nullable JournalKey max, boolean 
includeActive, long minSegment)
+    {
+        try (OpOrder.Group readOrder = this.readOrder.start())
+        {
+            return new TableAndSegmentKeyIterator<>(segments, table, min, max, 
includeActive, minSegment);
+        }
+    }
+
+    public void forEach(Consumer<JournalKey> consumer, boolean includeActive, 
long minSegment)
+    {
+        forEach(consumer, null, null, includeActive, minSegment);
+    }
+
+    public void forEach(Consumer<JournalKey> consumer, @Nullable JournalKey 
min, @Nullable JournalKey max, boolean includeActive, long minSegment)
+    {
+        try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
keyIterator(min, max, includeActive, minSegment))
+        {
+            while (iter.hasNext())
+            {
+                Journal.KeyRefs<JournalKey> ref = iter.next();
+                consumer.accept(ref.key());
+            }
+        }
+    }
+
+    public Compactor<JournalKey, Object> compactor()
+    {
+        return segments.compactor();
+    }
+
+    protected org.apache.cassandra.journal.SegmentCompactor<JournalKey, 
Object> compactor(ColumnFamilyStore cfs, Version userVersion)
+    {
+        if (rangeSearch == null)
+        {
+            Invariants.require(getAccord().range_index_mode != journal_sai, 
"range_index_mode is journal_sai, but the storage attached index was not found 
on initialisation");
+            return new SegmentCompactor<>(userVersion, cfs);
+        }
+
+        return rangeSearch.compactor(cfs, userVersion);
+    }
+
+    public void forceCompaction()
+    {
+        table.forceMajorCompaction();
+    }
+
+    @Override
+    public void purge(CommandStores commandStores, EpochSupplier minEpoch)
+    {
+        segments.closeCurrentSegmentForTestingIfNonEmpty();
+        segments.runCompactorForTesting();
+        forceCompaction();
+    }
+
+    public void replay(CommandStore commandStore, ReplayMode replayMode, long 
minSegmentId)
+    {
+        Long2LongHashMap minSegments = new Long2LongHashMap(0);
+        minSegments.put(commandStore.id(), minSegmentId);

Review Comment:
   this has been fixed already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to