This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 0894665243 Ninja: remove .orig file from repository 0894665243 is described below commit 08946652434edbce38a6395e71d4068898ea13fa Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Tue May 13 11:19:56 2025 +0200 Ninja: remove .orig file from repository --- .../accord/SimulatedAccordCommandStore.java.orig | 419 --------------------- 1 file changed, 419 deletions(-) diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig deleted file mode 100644 index f48156a3b3..0000000000 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.accord; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.function.BooleanSupplier; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.ToLongFunction; - -import accord.api.LocalListeners; -import accord.api.ProgressLog; -import accord.api.RemoteListeners; -import accord.api.RoutingKey; -import accord.impl.DefaultLocalListeners; -import accord.impl.SizeOfIntersectionSorter; -import accord.impl.TestAgent; -import accord.local.Command; -import accord.local.CommandStore; -import accord.local.CommandStores; -import accord.local.DurableBefore; -import accord.local.Node; -import accord.local.NodeCommandStoreService; -import accord.local.TimeService; -import accord.local.PreLoadContext; -import accord.local.SafeCommand; -import accord.local.SafeCommandStore; -import accord.messages.BeginRecovery; -import accord.messages.PreAccept; -import accord.messages.TxnRequest; -import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.Ballot; -import accord.primitives.FullRoute; -import accord.primitives.Ranges; -import accord.primitives.Routable; -import accord.primitives.RoutableKey; -import accord.primitives.RoutingKeys; -import accord.primitives.Timestamp; -import accord.primitives.Txn; -import accord.primitives.TxnId; -import accord.primitives.Unseekables; -import accord.topology.Topologies; -import accord.topology.Topology; -import accord.utils.Gens; -import accord.utils.RandomSource; -import accord.utils.async.AsyncChains; -import accord.utils.async.AsyncResult; -import org.apache.cassandra.concurrent.ExecutorFactory; -import org.apache.cassandra.concurrent.ScheduledExecutorPlus; -import org.apache.cassandra.concurrent.SimulatedExecutorFactory; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.memtable.Memtable; -import org.apache.cassandra.metrics.AccordCacheMetrics; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Generators; -import org.apache.cassandra.utils.Pair; -import org.assertj.core.api.Assertions; - -import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; -import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; -import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME; -import static org.apache.cassandra.utils.AccordGenerators.fromQT; - -public class SimulatedAccordCommandStore implements AutoCloseable -{ - private final List<Throwable> failures = new ArrayList<>(); - private final SimulatedExecutorFactory globalExecutor; - private final CommandStore.EpochUpdateHolder updateHolder; - private final BooleanSupplier shouldEvict, shouldFlush, shouldCompact; - - public final NodeCommandStoreService storeService; - public final AccordCommandStore store; - public final Node.Id nodeId; - public final Topology topology; - public final Topologies topologies; - public final MockJournal journal; - public final ScheduledExecutorPlus unorderedScheduled; - public final List<String> evictions = new ArrayList<>(); - public Predicate<Throwable> ignoreExceptions = ignore -> false; - - public SimulatedAccordCommandStore(RandomSource rs) - { - globalExecutor = new SimulatedExecutorFactory(rs.fork(), fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs), failures::add); - this.unorderedScheduled = globalExecutor.scheduled("ignored"); - ExecutorFactory.Global.unsafeSet(globalExecutor); - Stage.READ.unsafeSetExecutor(unorderedScheduled); - Stage.MUTATION.unsafeSetExecutor(unorderedScheduled); - for (Stage stage : Arrays.asList(Stage.MISC, Stage.ACCORD_MIGRATION, Stage.READ, Stage.MUTATION)) - stage.unsafeSetExecutor(globalExecutor.configureSequential("ignore").build()); - - this.updateHolder = new CommandStore.EpochUpdateHolder(); - this.nodeId = AccordTopology.tcmIdToAccord(ClusterMetadata.currentNullable().myNodeId()); - this.storeService = new NodeCommandStoreService() - { - private final ToLongFunction<TimeUnit> elapsed = TimeService.elapsedWrapperFromNonMonotonicSource(TimeUnit.NANOSECONDS, this::now); - - @Override public DurableBefore durableBefore() { return DurableBefore.EMPTY; } - - @Override - public Timestamp uniqueNow() - { - return uniqueNow(Timestamp.NONE); - } - - @Override - public Node.Id id() - { - return nodeId; - } - - @Override - public long epoch() - { - return ClusterMetadata.current().epoch.getEpoch(); - } - - @Override - public long now() - { - return globalExecutor.nanoTime(); - } - - @Override - public long elapsed(TimeUnit unit) - { - return elapsed.applyAsLong(unit); - } - - @Override - public Timestamp uniqueNow(Timestamp atLeast) - { - var now = Timestamp.fromValues(epoch(), now(), nodeId); - if (now.compareTo(atLeast) < 0) - throw new UnsupportedOperationException(); - return now; - } - }; - - AccordStateCache stateCache = new AccordStateCache(Stage.READ.executor(), Stage.MUTATION.executor(), 8 << 20, new AccordStateCacheMetrics("test")); - this.journal = new MockJournal(); - this.store = new AccordCommandStore(0, - storeService, - new TestAgent.RethrowAgent() - { - @Override - public long preAcceptTimeout() - { - return Long.MAX_VALUE; - } - - @Override - public void onUncaughtException(Throwable t) - { - if (ignoreExceptions.test(t)) return; - super.onUncaughtException(t); - } - }, - null, - ignore -> new ProgressLog.NoOpProgressLog(), - cs -> new DefaultLocalListeners(new RemoteListeners.NoOpRemoteListeners(), new DefaultLocalListeners.NotifySink() - { - @Override public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId listener) {} - @Override public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand, LocalListeners.ComplexListener listener) { return false; } - }), - updateHolder, - journal, -<<<<<<< HEAD - new AccordCommandStore.CommandStoreExecutor(stateCache, executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + 0 + ']'), Thread.currentThread().getId())); -======= - new AccordCommandStoreExecutor(new AccordStateCacheMetrics("test"), executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + 0 + ']'), agent)); - - this.topology = AccordTopology.createAccordTopology(ClusterMetadata.current()); - this.topologies = new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology); - var rangesForEpoch = new CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store); - //store.unsafeSetRangesForEpoch(rangesForEpoch); - updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges()); - updateHolder.updateGlobal(topology.ranges()); - - shouldEvict = boolSource(rs.fork()); - shouldFlush = boolSource(rs.fork()); - shouldCompact = boolSource(rs.fork()); ->>>>>>> 04671b52ef (Set ranges for epoch in AccordCommandStore via super call, not by fixing up Simulated store) - - store.cache().instances().forEach(i -> { - i.register(new AccordStateCache.Listener() - { - @Override - public void onAdd(AccordCachingState state) - { - } - - @Override - public void onRelease(AccordCachingState state) - { - } - - @Override - public void onEvict(AccordCachingState state) - { - evictions.add(i + " evicted " + state); - } - }); - }); - - this.topology = AccordTopology.createAccordTopology(ClusterMetadata.current()); - this.topologies = new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology); - var rangesForEpoch = new CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store); - updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges()); - updateHolder.updateGlobal(topology.ranges()); - - shouldEvict = boolSource(rs.fork()); - shouldFlush = boolSource(rs.fork()); - shouldCompact = boolSource(rs.fork()); - } - - private static BooleanSupplier boolSource(RandomSource rs) - { - var gen = Gens.bools().mixedDistribution().next(rs); - return () -> gen.next(rs); - } - - public TxnId nextTxnId(Txn.Kind kind, Routable.Domain domain) - { - return new TxnId(storeService.epoch(), storeService.now(), kind, domain, nodeId); - } - - public void maybeCacheEvict(Unseekables<?> keysOrRanges) - { - switch (keysOrRanges.domain()) - { - case Key: - maybeCacheEvict((AbstractUnseekableKeys) keysOrRanges, Ranges.EMPTY); - break; - case Range: - maybeCacheEvict(RoutingKeys.EMPTY, (Ranges) keysOrRanges); - break; - default: - throw new UnsupportedOperationException("Unknown domain: " + keysOrRanges.domain()); - } - } - - public void maybeCacheEvict(Unseekables<RoutingKey> keys, Ranges ranges) - { - AccordStateCache cache = store.cache(); - cache.forEach(state -> { - Class<?> keyType = state.key().getClass(); - if (TxnId.class.equals(keyType)) - { - Command command = (Command) state.state().get(); - if (command != null && command.known().definition.isKnown() - && (command.partialTxn().keys().intersects(keys) || ranges.intersects(command.partialTxn().keys())) - && shouldEvict.getAsBoolean()) - cache.maybeEvict(state); - } - else if (RoutableKey.class.isAssignableFrom(keyType)) - { - RoutableKey key = (RoutableKey) state.key(); - if ((keys.contains(key) || ranges.intersects(key)) - && shouldEvict.getAsBoolean()) - cache.maybeEvict(state); - } - else - { - throw new AssertionError("Unexpected key type: " + state.key().getClass()); - } - }); - - for (var store : Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStores()) - { - Memtable memtable = store.getCurrentMemtable(); - if (memtable.partitionCount() == 0 || !intersects(store, memtable, keys, ranges)) - continue; - if (shouldFlush.getAsBoolean()) - store.forceBlockingFlush(UNIT_TESTS); - } - for (var store : Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStores()) - { - if (store.getLiveSSTables().size() > 5 && shouldCompact.getAsBoolean()) - { - // compaction no-op since auto-compaction is disabled... so need to enable quickly - store.enableAutoCompaction(); - try - { - FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store)); - } - finally - { - store.disableAutoCompaction(); - } - } - } - } - - private static boolean intersects(ColumnFamilyStore store, Memtable memtable, Unseekables<RoutingKey> keys, Ranges ranges) - { - if (keys.isEmpty() && ranges.isEmpty()) // shouldn't happen, but just in case... - return false; - switch (store.name) - { - case "commands_for_key": - // pk = (store_id, routing_key) - // since this is simulating a single store, store_id is a constant, so check key - try (var it = memtable.partitionIterator(ColumnFilter.NONE, DataRange.allData(store.getPartitioner()), null)) - { - while (it.hasNext()) - { - var key = AccordKeyspace.CommandsForKeysAccessor.getKey(it.next().partitionKey()); - if (keys.contains(key) || ranges.intersects(key)) - return true; - } - } - break; - } - return false; - } - - public void checkFailures() - { - if (Thread.interrupted()) - failures.add(new InterruptedException()); - if (failures.isEmpty()) return; - AssertionError error = new AssertionError("Unexpected exceptions found"); - failures.forEach(error::addSuppressed); - failures.clear(); - throw error; - } - - public <T> T process(TxnRequest<T> request) throws ExecutionException, InterruptedException - { - return process(request, request::apply); - } - - public <T> T process(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) throws ExecutionException, InterruptedException - { - var result = processAsync(loadCtx, function); - processAll(); - return AsyncChains.getBlocking(result); - } - - public <T> AsyncResult<T> processAsync(TxnRequest<T> request) - { - return processAsync(request, request::apply); - } - - public <T> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) - { - return store.submit(loadCtx, function).beginAsResult(); - } - - public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>> enqueuePreAccept(Txn txn, FullRoute<?> route) - { - TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain()); - PreAccept preAccept = new PreAccept(nodeId, topologies, txnId, txn, route); - return Pair.create(txnId, processAsync(preAccept, safe -> { - var reply = preAccept.apply(safe); - Assertions.assertThat(reply.isOk()).isTrue(); - return (PreAccept.PreAcceptOk) reply; - })); - } - - public Pair<TxnId, AsyncResult<BeginRecovery.RecoverOk>> enqueueBeginRecovery(Txn txn, FullRoute<?> route) - { - TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain()); - Ballot ballot = Ballot.fromValues(storeService.epoch(), storeService.now(), nodeId); - BeginRecovery br = new BeginRecovery(nodeId, topologies, txnId, null, txn, route, ballot); - - return Pair.create(txnId, processAsync(br, safe -> { - var reply = br.apply(safe); - Assertions.assertThat(reply.isOk()).isTrue(); - return (BeginRecovery.RecoverOk) reply; - }).beginAsResult()); - } - - public void processAll() - { - while (processOne()) - { - } - } - - private boolean processOne() - { - boolean result = globalExecutor.processOne(); - checkFailures(); - return result; - } - - @Override - public void close() throws Exception - { - store.shutdown(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org