This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8226b2d7 CEP-15: (C*) Accord message processing should avoid being 
passed on to a Stage and run directly in the messageing handler
8226b2d7 is described below

commit 8226b2d7759319d7a0b0c823ab09b4344c5423f7
Author: David Capwell <[email protected]>
AuthorDate: Thu Apr 27 11:10:37 2023 -0700

    CEP-15: (C*) Accord message processing should avoid being passed on to a 
Stage and run directly in the messageing handler
    
    patch by David Capwell; reviewed by Ariel Weisberg, Benedict Elliott Smith 
for CASSANDRA-18364
---
 .build/checkstyle/checkstyle.xml                   |  36 +++++++
 .build/checkstyle/suppressions.xml                 |  24 +++++
 .circleci/config.yml                               |   8 +-
 .../src/main/java/accord/api/MessageSink.java      |   3 +-
 .../src/main/java/accord/api/RoutingKey.java       |   5 -
 accord-core/src/main/java/accord/api/Update.java   |   1 -
 .../main/java/accord/coordinate/Coordinate.java    |   3 +-
 .../src/main/java/accord/coordinate/Persist.java   |   4 +-
 .../src/main/java/accord/coordinate/Propose.java   |   1 -
 .../java/accord/impl/InMemoryCommandStore.java     |  89 ++++++++-------
 .../java/accord/impl/InMemoryCommandStores.java    |  17 +--
 .../MessageSink.java => local/AgentExecutor.java}  |  21 ++--
 .../src/main/java/accord/local/CommandStore.java   | 119 +++++++++++++++++++--
 .../src/main/java/accord/local/CommandStores.java  |  40 +++++--
 accord-core/src/main/java/accord/local/Node.java   |  46 ++++++--
 .../main/java/accord/local/SafeCommandStore.java   |   7 ++
 .../src/main/java/accord/messages/ReadData.java    |   2 +-
 .../main/java/accord/messages/SafeCallback.java    |  91 ++++++++++++++++
 .../src/main/java/accord/primitives/KeyDeps.java   |   1 -
 .../java/accord/primitives/PartialRangeRoute.java  |   1 -
 .../main/java/accord/primitives/PartialTxn.java    |   1 -
 .../src/main/java/accord/primitives/RangeDeps.java |   2 -
 .../src/main/java/accord/primitives/Routables.java |   4 -
 .../src/main/java/accord/primitives/Txn.java       |   2 -
 .../main/java/accord/topology/TopologyManager.java |  16 ++-
 .../main/java/accord/utils/async/AsyncChain.java   |  25 +++++
 .../main/java/accord/utils/async/AsyncChains.java  |  61 +++++++++++
 .../src/test/java/accord/burn/BurnTest.java        |   8 +-
 .../accord/burn/BurnTestConfigurationService.java  |  23 ++--
 .../SimulationException.java}                      |  36 +++----
 .../src/test/java/accord/burn/TopologyUpdates.java |  34 +++---
 .../java/accord/coordinate/CoordinateTest.java     |   1 +
 .../java/accord/coordinate/TopologyChangeTest.java |   2 -
 .../coordinate/tracking/ReadTrackerTest.java       |   1 -
 .../coordinate/tracking/TrackerReconciler.java     |  11 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  37 +++----
 .../accord/impl/basic/DelayedCommandStores.java    |  90 +++++++++++++++-
 .../src/test/java/accord/impl/basic/NodeSink.java  |  39 ++-----
 .../basic/SimulatedDelayedExecutorService.java     |  13 +--
 .../accord/impl/basic/TaskExecutorService.java     |   4 +-
 .../java/accord/impl/basic/UniformRandomQueue.java |  99 -----------------
 .../src/test/java/accord/impl/list/ListData.java   |   1 -
 .../test/java/accord/impl/list/ListRequest.java    |  32 +++---
 .../src/test/java/accord/impl/list/ListStore.java  |   2 -
 .../test/java/accord/impl/mock/MockCluster.java    |  28 ++---
 .../src/test/java/accord/impl/mock/MockStore.java  |   2 -
 .../src/test/java/accord/impl/mock/Network.java    |   5 +-
 .../accord/impl/mock/RecordingMessageSink.java     |   5 +-
 .../java/accord/impl/mock/SimpleMessageSink.java   |   7 +-
 .../src/test/java/accord/utils/MessageTask.java    |  31 +++---
 .../src/main/java/accord/maelstrom/Cluster.java    |  27 ++---
 .../src/main/java/accord/maelstrom/Json.java       |  13 ++-
 .../main/java/accord/maelstrom/MaelstromQuery.java |   1 -
 .../src/main/java/accord/maelstrom/Main.java       |   4 +-
 .../src/main/groovy/accord.java-conventions.gradle |   6 ++
 55 files changed, 784 insertions(+), 408 deletions(-)

diff --git a/.build/checkstyle/checkstyle.xml b/.build/checkstyle/checkstyle.xml
new file mode 100644
index 00000000..c8052136
--- /dev/null
+++ b/.build/checkstyle/checkstyle.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE module PUBLIC
+          "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+          "https://checkstyle.org/dtds/configuration_1_3.dtd";>
+
+<module name="Checker">
+  <property name="severity" value="error"/>
+
+  <property name="fileExtensions" value="java, properties, xml"/>
+
+  <module name="BeforeExecutionExclusionFileFilter">
+    <property name="fileNamePattern" value="module\-info\.java$"/>
+  </module>
+
+  <module name="TreeWalker">
+    <module name="RedundantImport"/>
+    <module name="UnusedImports"/>
+  </module>
+
+</module>
diff --git a/.build/checkstyle/suppressions.xml 
b/.build/checkstyle/suppressions.xml
new file mode 100644
index 00000000..230c808c
--- /dev/null
+++ b/.build/checkstyle/suppressions.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE suppressions PUBLIC
+        "-//Checkstyle//DTD SuppressionFilter Configuration 1.1//EN"
+        "https://checkstyle.org/dtds/suppressions_1_1.dtd";>
+
+<suppressions>
+</suppressions>
diff --git a/.circleci/config.yml b/.circleci/config.yml
index c2070aa7..2f59771f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -81,7 +81,7 @@ jobs:
           key: v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum 
"accord-core/build.gradle" }}-{{ checksum "accord-maelstrom/build.gradle" }}-{{ 
checksum "gradle/wrapper/gradle-wrapper.properties" }}
 
       # run tests!
-      - run: ./gradlew test
+      - run: ./gradlew check
       - run:
           name: Save Test Results
           command: |
@@ -90,3 +90,9 @@ jobs:
           when: always
       - store_test_results:
           path: test-results
+
+workflows:
+  version: 2
+  main:
+    jobs:
+      - build: {}
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java 
b/accord-core/src/main/java/accord/api/MessageSink.java
index 38ac946d..ee4d681e 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -18,6 +18,7 @@
 
 package accord.api;
 
+import accord.local.AgentExecutor;
 import accord.local.Node.Id;
 import accord.messages.Callback;
 import accord.messages.Reply;
@@ -27,6 +28,6 @@ import accord.messages.Request;
 public interface MessageSink
 {
     void send(Id to, Request request);
-    void send(Id to, Request request, Callback callback);
+    void send(Id to, Request request, AgentExecutor executor, Callback 
callback);
     void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
 }
diff --git a/accord-core/src/main/java/accord/api/RoutingKey.java 
b/accord-core/src/main/java/accord/api/RoutingKey.java
index 44b24f4f..4ae65067 100644
--- a/accord-core/src/main/java/accord/api/RoutingKey.java
+++ b/accord-core/src/main/java/accord/api/RoutingKey.java
@@ -21,11 +21,6 @@ package accord.api;
 import accord.primitives.Range;
 import accord.primitives.RoutableKey;
 import accord.primitives.Unseekable;
-import accord.utils.ArrayBuffers;
-
-import java.util.Arrays;
-
-import static accord.utils.ArrayBuffers.cachedRoutingKeys;
 
 public interface RoutingKey extends Unseekable, RoutableKey
 {
diff --git a/accord-core/src/main/java/accord/api/Update.java 
b/accord-core/src/main/java/accord/api/Update.java
index b3eb3875..f4092d3b 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -19,7 +19,6 @@
 package accord.api;
 
 import accord.primitives.Ranges;
-import accord.primitives.Keys;
 import accord.primitives.Seekables;
 
 import javax.annotation.Nullable;
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java 
b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index 839f41c9..f08dd399 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -72,7 +72,8 @@ public class Coordinate extends 
AsyncResults.SettableResult<Result> implements C
     {
         // TODO (desired, efficiency): consider sending only to electorate of 
most recent topology (as only these PreAccept votes matter)
         // note that we must send to all replicas of old topology, as 
electorate may not be reachable
-        node.send(tracker.nodes(), to -> new PreAccept(to, 
tracker.topologies(), txnId, txn, route), this);
+        node.send(tracker.nodes(), to -> new PreAccept(to, 
tracker.topologies(), txnId, txn, route),
+                  node.commandStores().select(route.homeKey()), this);
     }
 
     public static AsyncResult<Result> coordinate(Node node, TxnId txnId, Txn 
txn, FullRoute<?> route)
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 076cc669..926697f4 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -94,13 +94,13 @@ public class Persist implements Callback<ApplyReply>
                     {
                         // TODO (low priority, consider, efficiency): send to 
non-home replicas also, so they may clear their log more easily?
                         Shard homeShard = 
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
-                        node.send(homeShard, new InformHomeDurable(txnId, 
route.homeKey(), executeAt, Durable, persistedOn));
+                        node.send(homeShard, new InformHomeDurable(txnId, 
route.homeKey(), executeAt, Durable, new HashSet<>(persistedOn)));
                         isDone = true;
                     }
                     else if (!tracker.hasInFlight() && !tracker.hasFailures())
                     {
                         Shard homeShard = 
node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
-                        node.send(homeShard, new InformHomeDurable(txnId, 
route.homeKey(), executeAt, Universal, persistedOn));
+                        node.send(homeShard, new InformHomeDurable(txnId, 
route.homeKey(), executeAt, Universal, new HashSet<>(persistedOn)));
                     }
                 }
                 break;
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 6a9f6468..62d4cd61 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -38,7 +38,6 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptReply;
-import accord.utils.Invariants;
 
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
 import static accord.coordinate.tracking.RequestStatus.Failed;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index ff34236d..c7daf962 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -45,17 +44,10 @@ import static accord.local.SafeCommandStore.TestKind.Ws;
 import static accord.local.Status.*;
 import static accord.primitives.Routables.Slice.Minimal;
 
-public abstract class InMemoryCommandStore implements CommandStore
+public abstract class InMemoryCommandStore extends CommandStore
 {
     private static final Logger logger = 
LoggerFactory.getLogger(InMemoryCommandStore.class);
 
-    private final int id;
-    private final NodeTimeService time;
-    private final Agent agent;
-    private final DataStore store;
-    private final ProgressLog progressLog;
-    private final RangesForEpochHolder rangesForEpochHolder;
-
     private final NavigableMap<TxnId, GlobalCommand> commands = new 
TreeMap<>();
     private final NavigableMap<RoutableKey, GlobalCommandsForKey> 
commandsForKey = new TreeMap<>();
     private final CFKLoader cfkLoader = new CFKLoader();
@@ -67,12 +59,7 @@ public abstract class InMemoryCommandStore implements 
CommandStore
 
     public InMemoryCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder)
     {
-        this.id = id;
-        this.time = time;
-        this.agent = agent;
-        this.store = store;
-        this.progressLog = progressLogFactory.create(this);
-        this.rangesForEpochHolder = rangesForEpochHolder;
+        super(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder);
     }
 
     @Override
@@ -381,7 +368,6 @@ public abstract class InMemoryCommandStore implements 
CommandStore
     protected <T> T executeInContext(InMemoryCommandStore commandStore, 
PreLoadContext context, Function<? super SafeCommandStore, T> function)
     {
         return executeInContext(commandStore, context, function, true);
-
     }
 
     protected <T> void executeInContext(InMemoryCommandStore commandStore, 
PreLoadContext context, Function<? super SafeCommandStore, T> function, 
BiConsumer<? super T, Throwable> callback)
@@ -398,6 +384,14 @@ public abstract class InMemoryCommandStore implements 
CommandStore
         }
     }
 
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + "{" +
+               "id=" + id +
+               '}';
+    }
+
     class RangeCommand
     {
         final GlobalCommand command;
@@ -769,14 +763,16 @@ public abstract class InMemoryCommandStore implements 
CommandStore
             active = queue.poll();
             while (active != null)
             {
-                try
-                {
-                    active.run();
-                }
-                catch (Throwable t)
-                {
-                    logger.error("Uncaught exception", t);
-                }
+                this.unsafeRunIn(() -> {
+                    try
+                    {
+                        active.run();
+                    }
+                    catch (Throwable t)
+                    {
+                        logger.error("Uncaught exception", t);
+                    }
+                });
                 active = queue.poll();
             }
         }
@@ -789,6 +785,12 @@ public abstract class InMemoryCommandStore implements 
CommandStore
             maybeRun();
         }
 
+        @Override
+        public boolean inStore()
+        {
+            return CommandStore.maybeCurrent() == this;
+        }
+
         @Override
         public AsyncChain<Void> execute(PreLoadContext context, Consumer<? 
super SafeCommandStore> consumer)
         {
@@ -837,44 +839,39 @@ public abstract class InMemoryCommandStore implements 
CommandStore
 
     public static class SingleThread extends InMemoryCommandStore
     {
-        private final AtomicReference<Thread> expectedThread = new 
AtomicReference<>();
+        private Thread thread; // when run in the executor this will be 
non-null, null implies not running in this store
         private final ExecutorService executor;
 
         public SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder)
         {
-            this(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder, Executors.newSingleThreadExecutor(r -> {
+            super(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder);
+            this.executor = Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
                 thread.setName(CommandStore.class.getSimpleName() + '[' + 
time.id() + ']');
                 return thread;
-            }));
-        }
-
-        private SingleThread(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder, ExecutorService executor)
-        {
-            super(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder);
-            this.executor = executor;
-        }
-
-        public static CommandStore.Factory factory(ExecutorService executor)
-        {
-            return (id, time, agent, store, progressLogFactory, 
rangesForEpoch) -> new SingleThread(id, time, agent, store, progressLogFactory, 
rangesForEpoch, executor);
+            });
+            // "this" is leaked before constructor is completed, but since all 
fields are "final" and set before "this"
+            // is leaked, then visibility should not be an issue.
+            executor.execute(() -> thread = Thread.currentThread());
+            executor.execute(() -> CommandStore.register(this));
         }
 
         void assertThread()
         {
             Thread current = Thread.currentThread();
-            Thread expected;
-            while (true)
-            {
-                expected = expectedThread.get();
-                if (expected != null)
-                    break;
-                expectedThread.compareAndSet(null, Thread.currentThread());
-            }
+            Thread expected = thread;
+            if (expected == null)
+                throw new IllegalStateException(String.format("Command store 
called from wrong thread; unexpected %s", current));
             if (expected != current)
                 throw new IllegalStateException(String.format("Command store 
called from the wrong thread. Expected %s, got %s", expected, current));
         }
 
+        @Override
+        public boolean inStore()
+        {
+            return thread == Thread.currentThread();
+        }
+
         @Override
         public AsyncChain<Void> execute(PreLoadContext context, Consumer<? 
super SafeCommandStore> consumer)
         {
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index 163318a0..5fe87a56 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -23,35 +23,36 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.local.CommandStore;
+import accord.utils.RandomSource;
 
 public class InMemoryCommandStores
 {
     public static class Synchronized extends CommandStores
     {
-        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory)
+        public Synchronized(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory)
         {
-            super(time, agent, store, shardDistributor, progressLogFactory, 
InMemoryCommandStore.Synchronized::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, InMemoryCommandStore.Synchronized::new);
         }
     }
 
     public static class SingleThread extends CommandStores
     {
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory)
         {
-            super(time, agent, store, shardDistributor, progressLogFactory, 
InMemoryCommandStore.SingleThread::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, InMemoryCommandStore.SingleThread::new);
         }
 
-        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, CommandStore.Factory shardFactory)
+        public SingleThread(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
         {
-            super(time, agent, store, shardDistributor, progressLogFactory, 
shardFactory);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, shardFactory);
         }
     }
 
     public static class Debug extends InMemoryCommandStores.SingleThread
     {
-        public Debug(NodeTimeService time, Agent agent, DataStore store, 
ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory)
+        public Debug(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory)
         {
-            super(time, agent, store, shardDistributor, progressLogFactory, 
InMemoryCommandStore.Debug::new);
+            super(time, agent, store, random, shardDistributor, 
progressLogFactory, InMemoryCommandStore.Debug::new);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java 
b/accord-core/src/main/java/accord/local/AgentExecutor.java
similarity index 68%
copy from accord-core/src/main/java/accord/api/MessageSink.java
copy to accord-core/src/main/java/accord/local/AgentExecutor.java
index 38ac946d..e304cc21 100644
--- a/accord-core/src/main/java/accord/api/MessageSink.java
+++ b/accord-core/src/main/java/accord/local/AgentExecutor.java
@@ -16,17 +16,18 @@
  * limitations under the License.
  */
 
-package accord.api;
+package accord.local;
 
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.messages.Reply;
-import accord.messages.ReplyContext;
-import accord.messages.Request;
+import accord.api.Agent;
+import accord.utils.async.AsyncExecutor;
 
-public interface MessageSink
+public interface AgentExecutor extends AsyncExecutor
 {
-    void send(Id to, Request request);
-    void send(Id to, Request request, Callback callback);
-    void reply(Id replyingToNode, ReplyContext replyContext, Reply reply);
+    Agent agent();
+
+    @Override
+    default void execute(Runnable command)
+    {
+        submit(command).begin(agent());
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 479f5817..d84794ab 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -23,17 +23,19 @@ import accord.api.ProgressLog;
 import accord.api.DataStore;
 import accord.local.CommandStores.RangesForEpochHolder;
 import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncExecutor;
 
+import java.util.concurrent.Callable;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import javax.annotation.Nullable;
+
 /**
  * Single threaded internal shard of accord transaction metadata
  */
-public interface CommandStore extends AsyncExecutor
+public abstract class CommandStore implements AgentExecutor
 {
-    interface Factory
+    public interface Factory
     {
         CommandStore create(int id,
                             NodeTimeService time,
@@ -43,16 +45,113 @@ public interface CommandStore extends AsyncExecutor
                             RangesForEpochHolder rangesForEpoch);
     }
 
-    int id();
-    Agent agent();
-    AsyncChain<Void> execute(PreLoadContext context, Consumer<? super 
SafeCommandStore> consumer);
-    <T> AsyncChain<T> submit(PreLoadContext context, Function<? super 
SafeCommandStore, T> apply);
+    private static final ThreadLocal<CommandStore> CURRENT_STORE = new 
ThreadLocal<>();
+
+    protected final int id;
+    protected final NodeTimeService time;
+    protected final Agent agent;
+    protected final DataStore store;
+    protected final ProgressLog progressLog;
+    protected final RangesForEpochHolder rangesForEpochHolder;
+
+    protected CommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder)
+    {
+        this.id = id;
+        this.time = time;
+        this.agent = agent;
+        this.store = store;
+        this.progressLog = progressLogFactory.create(this);
+        this.rangesForEpochHolder = rangesForEpochHolder;
+    }
+
+    public int id()
+    {
+        return id;
+    }
+
+    @Override
+    public Agent agent()
+    {
+        return agent;
+    }
+
+    public abstract boolean inStore();
+
+    public abstract AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer);
+
+    public abstract <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply);
+
+    public abstract void shutdown();
+
+    protected void unsafeRunIn(Runnable fn)
+    {
+        CommandStore prev = maybeCurrent();
+        CURRENT_STORE.set(this);
+        try
+        {
+            fn.run();
+        }
+        finally
+        {
+            if (prev == null) CURRENT_STORE.remove();
+            else CURRENT_STORE.set(prev);
+        }
+    }
+
+    protected <T> T unsafeRunIn(Callable<T> fn) throws Exception
+    {
+        CommandStore prev = maybeCurrent();
+        CURRENT_STORE.set(this);
+        try
+        {
+            return fn.call();
+        }
+        finally
+        {
+            if (prev == null) CURRENT_STORE.remove();
+            else CURRENT_STORE.set(prev);
+        }
+    }
 
     @Override
-    default void execute(Runnable command)
+    public String toString()
+    {
+        return getClass().getSimpleName() + "{" +
+               "id=" + id +
+               '}';
+    }
+
+    @Nullable
+    public static CommandStore maybeCurrent()
+    {
+        return CURRENT_STORE.get();
+    }
+
+    public static CommandStore current()
     {
-        submit(command).begin(agent());
+        CommandStore cs = maybeCurrent();
+        if (cs == null)
+            throw new IllegalStateException("Attempted to access current 
CommandStore, but not running in a CommandStore");
+        return cs;
     }
 
-    void shutdown();
+    protected static void register(CommandStore store)
+    {
+        if (!store.inStore())
+            throw new IllegalStateException("Unable to register a CommandStore 
when not running in it; store " + store);
+        CURRENT_STORE.set(store);
+    }
+
+    public static void checkInStore()
+    {
+        CommandStore store = maybeCurrent();
+        if (store == null) throw new IllegalStateException("Expected to be 
running in a CommandStore but is not");
+    }
+
+    public static void checkNotInStore()
+    {
+        CommandStore store = maybeCurrent();
+        if (store != null)
+            throw new IllegalStateException("Expected to not be running in a 
CommandStore, but running in " + store);
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 67b4f058..943bc075 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -26,6 +26,8 @@ import accord.utils.MapReduce;
 import accord.utils.MapReduceConsume;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import accord.utils.RandomSource;
 import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2ObjectHashMap;
 import accord.utils.async.AsyncChain;
@@ -35,10 +37,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.IntStream;
 
 import static accord.local.PreLoadContext.empty;
@@ -54,6 +55,7 @@ public abstract class CommandStores<S extends CommandStore>
         CommandStores<?> create(NodeTimeService time,
                                 Agent agent,
                                 DataStore store,
+                                RandomSource random,
                                 ShardDistributor shardDistributor,
                                 ProgressLog.Factory progressLogFactory);
     }
@@ -65,19 +67,21 @@ public abstract class CommandStores<S extends CommandStore>
         private final DataStore store;
         private final ProgressLog.Factory progressLogFactory;
         private final CommandStore.Factory shardFactory;
+        private final RandomSource random;
 
-        Supplier(NodeTimeService time, Agent agent, DataStore store, 
ProgressLog.Factory progressLogFactory, CommandStore.Factory shardFactory)
+        Supplier(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ProgressLog.Factory progressLogFactory, 
CommandStore.Factory shardFactory)
         {
             this.time = time;
             this.agent = agent;
             this.store = store;
+            this.random = random;
             this.progressLogFactory = progressLogFactory;
             this.shardFactory = shardFactory;
         }
 
         CommandStore create(int id, RangesForEpochHolder rangesForEpoch)
         {
-            return shardFactory.create(id, time, agent, store, 
progressLogFactory, rangesForEpoch);
+            return shardFactory.create(id, time, agent, this.store, 
progressLogFactory, rangesForEpoch);
         }
     }
 
@@ -230,10 +234,10 @@ public abstract class CommandStores<S extends 
CommandStore>
         this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, 
Topology.EMPTY);
     }
 
-    public CommandStores(NodeTimeService time, Agent agent, DataStore store, 
ShardDistributor shardDistributor,
+    public CommandStores(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random, ShardDistributor shardDistributor,
                          ProgressLog.Factory progressLogFactory, 
CommandStore.Factory shardFactory)
     {
-        this(new Supplier(time, agent, store, progressLogFactory, 
shardFactory), shardDistributor);
+        this(new Supplier(time, agent, store, random, progressLogFactory, 
shardFactory), shardDistributor);
     }
 
     public Topology local()
@@ -416,6 +420,30 @@ public abstract class CommandStores<S extends CommandStore>
             shard.store.shutdown();
     }
 
+    public CommandStore select(RoutingKey key)
+    {
+        return  select(ranges -> ranges.contains(key));
+    }
+
+    private CommandStore select(Predicate<Ranges> fn)
+    {
+        ShardHolder[] shards = current.shards;
+        for (ShardHolder holder : shards)
+        {
+            if (fn.test(holder.ranges().currentRanges()))
+                return holder.store;
+        }
+        return any();
+    }
+
+    @VisibleForTesting
+    public CommandStore any()
+    {
+        ShardHolder[] shards = current.shards;
+        if (shards.length == 0) throw new IllegalStateException("Unable to get 
CommandStore; non defined");
+        return shards[supplier.random.nextInt(shards.length)].store;
+    }
+
     public CommandStore forId(int id)
     {
         Snapshot snapshot = current;
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index fbf59874..66c0bf6a 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -145,7 +145,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.agent = agent;
         this.random = random;
         this.scheduler = scheduler;
-        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
shardDistributor, progressLogFactory.apply(this));
+        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this));
 
         configService.registerListener(this);
         onTopologyUpdate(topology, false);
@@ -203,7 +203,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         else
         {
             configService.fetchTopologyForEpoch(epoch);
-            topology.awaitEpoch(epoch).addCallback(runnable);
+            topology.awaitEpoch(epoch).addCallback(runnable).begin(agent());
         }
     }
 
@@ -311,7 +311,13 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
 
     public void send(Shard shard, Request send, Callback callback)
     {
-        shard.nodes.forEach(node -> messageSink.send(node, send, callback));
+        send(shard, send, CommandStore.current(), callback);
+    }
+
+    private void send(Shard shard, Request send, AgentExecutor executor, 
Callback callback)
+    {
+        checkStore(executor);
+        shard.nodes.forEach(node -> messageSink.send(node, send, executor, 
callback));
     }
 
     private <T> void send(Shard shard, Request send, Set<Id> alreadyContacted)
@@ -334,18 +340,44 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
 
     public <T> void send(Collection<Id> to, Request send, Callback<T> callback)
     {
-        to.forEach(dst -> send(dst, send, callback));
+        send(to, send, CommandStore.current(), callback);
+    }
+
+    public <T> void send(Collection<Id> to, Request send, AgentExecutor 
executor, Callback<T> callback)
+    {
+        checkStore(executor);
+        to.forEach(dst -> messageSink.send(dst, send, executor, callback));
     }
 
     public <T> void send(Collection<Id> to, Function<Id, Request> 
requestFactory, Callback<T> callback)
     {
-        to.forEach(dst -> send(dst, requestFactory.apply(dst), callback));
+        send(to, requestFactory, CommandStore.current(), callback);
+    }
+
+    public <T> void send(Collection<Id> to, Function<Id, Request> 
requestFactory, AgentExecutor executor, Callback<T> callback)
+    {
+        checkStore(executor);
+        to.forEach(dst -> messageSink.send(dst, requestFactory.apply(dst), 
executor, callback));
     }
 
     // send to a specific node
     public <T> void send(Id to, Request send, Callback<T> callback)
     {
-        messageSink.send(to, send, callback);
+        send(to, send, CommandStore.current(), callback);
+    }
+
+    // send to a specific node
+    public <T> void send(Id to, Request send, AgentExecutor executor, 
Callback<T> callback)
+    {
+        checkStore(executor);
+        messageSink.send(to, send, executor, callback);
+    }
+
+    private void checkStore(AgentExecutor executor)
+    {
+        CommandStore current = CommandStore.maybeCurrent();
+        if (current != null && current != executor)
+            throw new IllegalStateException(String.format("Used wrong 
CommandStore %s; current is %s", executor, current));
     }
 
     // send to a specific node
@@ -495,7 +527,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         if (unknownEpoch > 0)
         {
             configService.fetchTopologyForEpoch(unknownEpoch);
-            topology().awaitEpoch(unknownEpoch).addCallback(() -> 
receive(request, from, replyContext));
+            topology().awaitEpoch(unknownEpoch).addCallback(() -> 
receive(request, from, replyContext)).begin(agent());
             return;
         }
         scheduler.now(() -> request.process(this, from, replyContext));
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 5b2f5eca..892cbfee 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -105,6 +105,13 @@ public interface SafeCommandStore
         Command command = safeCommand.current();
         for (CommandListener listener : command.listeners())
         {
+            if (!safeCommand.current().listeners().contains(listener))
+            {
+                // notifyListeners is done for every mutation, which can cause 
listeners to be different depending on
+                // where you are in the stack frame...
+                // To simplify listeners, double check that this wasn't 
changed before calling again
+                continue;
+            }
             PreLoadContext context = 
listener.listenerPreLoadContext(command.txnId());
             if (canExecuteWith(context))
             {
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index d7558d35..6350793c 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -269,7 +269,7 @@ public class ReadData extends 
AbstractEpochRequest<ReadData.ReadNack> implements
                 case RETURNED:
                     throw new IllegalStateException("ReadOk was sent, yet ack 
called again");
                 case OBSOLETE:
-                    logger.debug("After the read completed for txn {}, the 
result was marked obsolete", txnId);
+                    logger.trace("After the read completed for txn {}, the 
result was marked obsolete", txnId);
                     break;
                 case PENDING:
                     state = State.RETURNED;
diff --git a/accord-core/src/main/java/accord/messages/SafeCallback.java 
b/accord-core/src/main/java/accord/messages/SafeCallback.java
new file mode 100644
index 00000000..a8a1416f
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/SafeCallback.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.messages;
+
+import java.util.Objects;
+
+import accord.coordinate.Timeout;
+import accord.local.AgentExecutor;
+import accord.local.Node;
+
+public class SafeCallback<T extends Reply>
+{
+    private final AgentExecutor executor;
+    private final Callback<T> callback;
+
+    public SafeCallback(AgentExecutor executor, Callback<T> callback)
+    {
+        this.executor = Objects.requireNonNull(executor, "executor");
+        this.callback = Objects.requireNonNull(callback, "callback");
+    }
+
+    public void success(Node.Id src, T reply)
+    {
+        safeCall(src, reply, Callback::onSuccess);
+    }
+
+    public void slowResponse(Node.Id src)
+    {
+        safeCall(src, null, (callback, id, ignore) -> 
callback.onSlowResponse(id));
+    }
+
+    public void failure(Node.Id to, Throwable t)
+    {
+        safeCall(to, t, Callback::onFailure);
+    }
+
+    public void timeout(Node.Id to)
+    {
+        failure(to, new Timeout(null, null));
+    }
+
+    private interface SafeCall<T, P>
+    {
+        void accept(Callback<T> callback, Node.Id id, P param) throws 
Throwable;
+    }
+
+    private <P> void safeCall(Node.Id src, P param, SafeCall<T, P> call)
+    {
+        // TODO (low priority, correctness): if the executor is shutdown this 
propgates the exception to the network stack
+        executor.execute(() -> {
+            try
+            {
+                call.accept(callback, src, param);
+            }
+            catch (Throwable t)
+            {
+                try
+                {
+                    callback.onCallbackFailure(src, t);
+                }
+                catch (Throwable t2)
+                {
+                    t.addSuppressed(t2);
+                    executor.agent().onUncaughtException(t);
+                }
+            }
+        });
+    }
+
+    @Override
+    public String toString()
+    {
+        return callback.toString();
+    }
+}
diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java 
b/accord-core/src/main/java/accord/primitives/KeyDeps.java
index f9887b3b..1ac6b65d 100644
--- a/accord-core/src/main/java/accord/primitives/KeyDeps.java
+++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java
@@ -29,7 +29,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
-import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.ArrayBuffers.*;
 import static accord.utils.RelationMultiMap.*;
 import static accord.utils.SortedArrays.Search.FAST;
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java 
b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index 437ed2b9..0d53d8a1 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -22,7 +22,6 @@ import accord.api.RoutingKey;
 import accord.utils.Invariants;
 
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
-import static accord.primitives.Routables.Slice.Overlapping;
 
 /**
  * A slice of a Route that covers
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java 
b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 9fa74754..7a6a14c0 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -23,7 +23,6 @@ import javax.annotation.Nullable;
 import accord.api.Query;
 import accord.api.Read;
 import accord.api.Update;
-import accord.utils.Invariants;
 
 public interface PartialTxn extends Txn
 {
diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java 
b/accord-core/src/main/java/accord/primitives/RangeDeps.java
index 22d49080..568420e8 100644
--- a/accord-core/src/main/java/accord/primitives/RangeDeps.java
+++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java
@@ -35,8 +35,6 @@ import java.util.function.Predicate;
 import static accord.utils.ArrayBuffers.*;
 import static accord.utils.RelationMultiMap.*;
 import static accord.utils.RelationMultiMap.remove;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
 import static accord.utils.SortedArrays.Search.CEIL;
 
 /**
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java 
b/accord-core/src/main/java/accord/primitives/Routables.java
index 62798219..74889f28 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -26,10 +26,6 @@ import accord.utils.IndexedRangeFoldToLong;
 import accord.utils.SortedArrays;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static accord.primitives.Routables.Slice.Overlapping;
 import static accord.utils.SortedArrays.Search.FLOOR;
 
 /**
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java 
b/accord-core/src/main/java/accord/primitives/Txn.java
index f1d26558..7172cb6d 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -31,8 +31,6 @@ import accord.utils.async.AsyncChains;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 6ffbc27a..14817aca 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -22,6 +22,7 @@ import accord.api.ConfigurationService;
 import accord.api.RoutingKey;
 import accord.api.TopologySorter;
 import accord.coordinate.tracking.QuorumTracker;
+import accord.local.CommandStore;
 import accord.local.Node.Id;
 import accord.messages.Request;
 import accord.primitives.*;
@@ -259,9 +260,15 @@ public class TopologyManager implements 
ConfigurationService.Listener
             toComplete.trySuccess(null);
     }
 
-    public synchronized AsyncResult<Void> awaitEpoch(long epoch)
+    public AsyncChain<Void> awaitEpoch(long epoch)
     {
-        return epochs.awaitEpoch(epoch);
+        AsyncResult<Void> result;
+        synchronized (this)
+        {
+            result = epochs.awaitEpoch(epoch);
+        }
+        CommandStore current = CommandStore.maybeCurrent();
+        return current == null || result.isDone() ? result : 
result.withExecutor(current);
     }
 
     @Override
@@ -399,7 +406,10 @@ public class TopologyManager implements 
ConfigurationService.Listener
 
     public Topology localForEpoch(long epoch)
     {
-        return epochs.get(epoch).local();
+        EpochState epochState = epochs.get(epoch);
+        if (epochState == null)
+            throw new IllegalStateException("Unknown epoch " + epoch);
+        return epochState.local();
     }
 
     public Ranges localRangesForEpoch(long epoch)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index fa88aed4..130731d4 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -32,11 +32,21 @@ public interface AsyncChain<V>
      */
     <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
 
+    default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper, 
Executor executor)
+    {
+        return AsyncChains.map(this, mapper, executor);
+    }
+
     /**
      * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
      */
     <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> 
mapper);
 
+    default <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper, Executor executor)
+    {
+        return AsyncChains.flatMap(this, mapper, executor);
+    }
+
     default AsyncChain<Void> accept(Consumer<? super V> action)
     {
         return map(r -> {
@@ -45,6 +55,21 @@ public interface AsyncChain<V>
         });
     }
 
+    default AsyncChain<Void> accept(Consumer<? super V> action, Executor 
executor)
+    {
+        return map(r -> {
+            action.accept(r);
+            return null;
+        }, executor);
+    }
+
+    default AsyncChain<V> withExecutor(Executor e)
+    {
+        // since a chain runs as a sequence of callbacks, by adding a callback 
that moves to this executor any new actions
+        // will be run on that desired executor.
+        return map(a -> a, e);
+    }
+
     /**
      * Support {@link com.google.common.util.concurrent.Futures#addCallback} 
natively
      */
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 06f3f33e..809ca1c7 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -331,6 +331,67 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
         return new Immediate<>(failure);
     }
 
+    public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<? 
super V, ? extends T> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        T value;
+                        try
+                        {
+                            value = mapper.apply(v);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                            return;
+                        }
+                        callback.accept(value, null);
+                    });
+                }
+                catch (Throwable t)
+                {
+                    // TODO (low priority, correctness): If the executor is 
shutdown then the callback may run in an unexpected thread, which may not be 
thread safe
+                    callback.accept(null, t);
+                }
+            }
+        });
+    }
+
+    public static <V, T> AsyncChain<T> flatMap(AsyncChain<V> chain, Function<? 
super V, ? extends AsyncChain<T>> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        try
+                        {
+                            mapper.apply(v).addCallback(callback);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                        }
+                    });
+                }
+                catch (Throwable t)
+                {
+                    // TODO (low priority, correctness): If the executor is 
shutdown then the callback may run in an unexpected thread, which may not be 
thread safe
+                    callback.accept(null, t);
+                }
+            }
+        });
+    }
+
     public static <V> AsyncChain<V> ofCallable(Executor executor, Callable<V> 
callable)
     {
         return new Head<V>()
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 78269a79..c4feef07 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -49,6 +49,7 @@ import accord.impl.basic.PendingQueue;
 import accord.impl.basic.PropagatingPendingQueue;
 import accord.impl.basic.RandomDelayQueue.Factory;
 import accord.impl.basic.SimulatedDelayedExecutorService;
+import accord.impl.list.ListAgent;
 import accord.impl.list.ListQuery;
 import accord.impl.list.ListRead;
 import accord.impl.list.ListRequest;
@@ -199,7 +200,8 @@ public class BurnTest
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         PendingQueue queue = new PropagatingPendingQueue(failures, new 
Factory(random).get());
-        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, random.fork());
+        ListAgent agent = new ListAgent(30L, failures::add);
+        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, agent, random.fork());
 
         StrictSerializabilityVerifier strictSerializable = new 
StrictSerializabilityVerifier(keyCount);
         Function<CommandStore, AsyncExecutor> executor = ignore -> 
globalExecutor;
@@ -276,7 +278,7 @@ public class BurnTest
         try
         {
             Cluster.run(toArray(nodes, Id[]::new), () -> queue,
-                        responseSink, failures::add,
+                        responseSink, globalExecutor,
                         () -> random.fork(),
                         () -> new AtomicLong()::incrementAndGet,
                         topologyFactory, () -> null);
@@ -356,7 +358,7 @@ public class BurnTest
         catch (Throwable t)
         {
             logger.error("Exception running burn test for seed {}:", seed, t);
-            throw t;
+            throw SimulationException.wrap(seed, t);
         }
     }
 
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 2f86e935..3f718856 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -18,8 +18,8 @@
 
 package accord.burn;
 
-import accord.api.MessageSink;
 import accord.api.TestableConfigurationService;
+import accord.local.AgentExecutor;
 import accord.utils.RandomSource;
 import accord.local.Node;
 import accord.messages.*;
@@ -42,7 +42,7 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
     private static final Logger logger = 
LoggerFactory.getLogger(BurnTestConfigurationService.class);
 
     private final Node.Id node;
-    private final MessageSink messageSink;
+    private final AgentExecutor executor;
     private final Function<Node.Id, Node> lookup;
     private final Supplier<RandomSource> randomSupplier;
     private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
@@ -128,10 +128,10 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
         }
     }
 
-    public BurnTestConfigurationService(Node.Id node, MessageSink messageSink, 
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, 
Node> lookup, TopologyUpdates topologyUpdates)
+    public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, 
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, 
Node> lookup, TopologyUpdates topologyUpdates)
     {
         this.node = node;
-        this.messageSink = messageSink;
+        this.executor = executor;
         this.randomSupplier = randomSupplier;
         this.lookup = lookup;
         this.topologyUpdates = topologyUpdates;
@@ -218,10 +218,10 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
         {
             this.request = new FetchTopologyRequest(epoch);
             this.candidates = new ArrayList<>();
-            sendNext();
+            executor.execute(this::sendNext);
         }
 
-        synchronized void sendNext()
+        void sendNext()
         {
             if (candidates.isEmpty())
             {
@@ -230,7 +230,7 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
             }
             int idx = randomSupplier.get().nextInt(candidates.size());
             Node.Id node = candidates.remove(idx);
-            messageSink.send(node, request, this);
+            originator().send(node, request, executor, this);
         }
 
         @Override
@@ -243,7 +243,7 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
         }
 
         @Override
-        public synchronized void onFailure(Node.Id from, Throwable failure)
+        public void onFailure(Node.Id from, Throwable failure)
         {
             sendNext();
         }
@@ -269,10 +269,15 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
     {
         epochs.acknowledge(epoch);
         Topology topology = getTopologyForEpoch(epoch);
-        Node originator = lookup.apply(node);
+        Node originator = originator();
         topologyUpdates.syncEpoch(originator, epoch - 1, topology.nodes());
     }
 
+    private Node originator()
+    {
+        return lookup.apply(node);
+    }
+
     @Override
     public synchronized void reportTopology(Topology topology)
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java 
b/accord-core/src/test/java/accord/burn/SimulationException.java
similarity index 54%
copy from accord-core/src/test/java/accord/impl/list/ListData.java
copy to accord-core/src/test/java/accord/burn/SimulationException.java
index cc582276..08ecae50 100644
--- a/accord-core/src/test/java/accord/impl/list/ListData.java
+++ b/accord-core/src/test/java/accord/burn/SimulationException.java
@@ -16,31 +16,29 @@
  * limitations under the License.
  */
 
-package accord.impl.list;
+package accord.burn;
 
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
+public class SimulationException extends RuntimeException
+{
+    public SimulationException(long seed, Throwable t)
+    {
+        super(createMsg(seed, null), t, true, false);
+    }
 
-import accord.api.Data;
-import accord.api.Key;
+    public SimulationException(long seed, String msg, Throwable t)
+    {
+        super(createMsg(seed, msg), t, true, false);
+    }
 
-public class ListData extends TreeMap<Key, int[]> implements Data
-{
-    @Override
-    public Data merge(Data data)
+    private static String createMsg(long seed, String msg)
     {
-        if (data != null)
-            this.putAll(((ListData)data));
-        return this;
+        return String.format("Failed on seed %d%s", seed, msg == null ? "" : 
"; " + msg);
     }
 
-    @Override
-    public String toString()
+    public static SimulationException wrap(long seed, Throwable t)
     {
-        return entrySet().stream()
-                         .map(e -> e.getKey() + "=" + 
Arrays.toString(e.getValue()))
-                         .collect(Collectors.joining(", ", "{", "}"));
+        if (t instanceof SimulationException)
+            return (SimulationException) t;
+        return new SimulationException(seed, t);
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java 
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index ffea504c..e2e897ee 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -21,8 +21,9 @@ package accord.burn;
 import accord.api.TestableConfigurationService;
 import accord.impl.InMemoryCommandStore;
 import accord.coordinate.FetchData;
-import accord.impl.InMemoryCommandStores;
+import accord.local.AgentExecutor;
 import accord.local.Command;
+import accord.local.CommandStore;
 import accord.local.Commands;
 import accord.local.Node;
 import accord.local.Status;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -53,7 +53,6 @@ import static accord.coordinate.Invalidate.invalidate;
 import static accord.local.PreLoadContext.contextFor;
 import static accord.local.Status.*;
 import static accord.local.Status.Known.*;
-import static accord.utils.async.AsyncChains.getUninterruptibly;
 
 public class TopologyUpdates
 {
@@ -91,7 +90,7 @@ public class TopologyUpdates
             if (!node.topology().hasEpoch(toEpoch))
             {
                 node.configService().fetchTopologyForEpoch(toEpoch);
-                node.topology().awaitEpoch(toEpoch).addCallback(() -> 
process(node, onDone));
+                node.topology().awaitEpoch(toEpoch).addCallback(() -> 
process(node, onDone)).begin(node.agent());
                 return;
             }
 
@@ -142,19 +141,21 @@ public class TopologyUpdates
                         dieExceptionally(invalidate.addCallback(((unused, 
failure) -> onDone.accept(failure == null))).beginAsResult());
                 }
                 return null;
-            }).beginAsResult();
+            }, node.commandStores().any()).beginAsResult();
             dieExceptionally(sync);
         }
     }
 
     private final Set<Long> pendingTopologies = Sets.newConcurrentHashSet();
+    private final AgentExecutor executor;
 
     public static <T> BiConsumer<T, Throwable> dieOnException()
     {
         return (result, throwable) -> {
             if (throwable != null)
             {
-                logger.error("", throwable);
+                logger.error("Unexpected exception", throwable);
+                logger.error("", new Throwable("Shutting down test"));
                 System.exit(1);
             }
         };
@@ -166,10 +167,16 @@ public class TopologyUpdates
         return stage;
     }
 
+    public TopologyUpdates(AgentExecutor executor)
+    {
+        this.executor = executor;
+    }
+
     public MessageTask notify(Node originator, Collection<Node.Id> cluster, 
Topology update)
     {
         pendingTopologies.add(update.epoch());
-        return MessageTask.begin(originator, cluster, "TopologyNotify:" + 
update.epoch(), (node, from, onDone) -> {
+        CommandStore.checkNotInStore();
+        return MessageTask.begin(originator, cluster, executor, 
"TopologyNotify:" + update.epoch(), (node, from, onDone) -> {
             long nodeEpoch = node.topology().epoch();
             if (nodeEpoch + 1 < update.epoch())
                 onDone.accept(false);
@@ -186,7 +193,7 @@ public class TopologyUpdates
         return result;
     }
 
-    private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node 
node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> 
recipients, long trgEpoch, boolean committedOnly)
+    private AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long 
srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, 
long trgEpoch, boolean committedOnly)
     {
         Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> 
syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), 
CheckStatusOk::merge);
@@ -198,7 +205,7 @@ public class TopologyUpdates
 
         return start.map(ignore -> syncMessages.entrySet().stream().map(e -> {
             CommandSync sync = new CommandSync(e.getKey(), e.getValue(), 
srcEpoch, trgEpoch);
-            return MessageTask.of(node, recipients.apply(sync), 
sync.toString(), sync::process);
+            return MessageTask.of(node, recipients.apply(sync), executor, 
sync.toString(), sync::process);
         }));
     }
 
@@ -208,13 +215,14 @@ public class TopologyUpdates
     /**
      * Syncs all replicated commands. Overkill, but useful for confirming 
issues in optimizedSync
      */
-    private static AsyncChain<Stream<MessageTask>> thoroughSync(Node node, 
long syncEpoch)
+    private AsyncChain<Stream<MessageTask>> thoroughSync(Node node, long 
syncEpoch)
     {
         Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
         Topology localTopology = syncTopology.forNode(node.id());
         Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
 
         Ranges ranges = localTopology.ranges();
+
         List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>();
         for (long epoch=1; epoch<=syncEpoch; epoch++)
             work.add(syncEpochCommands(node, epoch, ranges, allNodes, 
syncEpoch, COMMITTED_ONLY));
@@ -224,7 +232,7 @@ public class TopologyUpdates
     /**
      * Syncs all newly replicated commands when nodes are gaining ranges and 
the current epoch
      */
-    private static AsyncChain<Stream<MessageTask>> optimizedSync(Node node, 
long srcEpoch)
+    private AsyncChain<Stream<MessageTask>> optimizedSync(Node node, long 
srcEpoch)
     {
         long trgEpoch = srcEpoch + 1;
         Topology syncTopology = 
node.configService().getTopologyForEpoch(srcEpoch);
@@ -264,7 +272,7 @@ public class TopologyUpdates
         return AsyncChains.reduce(work, Stream.empty(), Stream::concat);
     }
 
-    private static AsyncChain<Void> sync(Node node, long syncEpoch)
+    private AsyncChain<Void> sync(Node node, long syncEpoch)
     {
         return optimizedSync(node, syncEpoch)
                 .flatMap(messageStream -> {
@@ -288,7 +296,7 @@ public class TopologyUpdates
     public AsyncResult<Void> syncEpoch(Node originator, long epoch, 
Collection<Node.Id> cluster)
     {
         AsyncResult<Void> result = dieExceptionally(sync(originator, epoch)
-                .flatMap(v -> MessageTask.apply(originator, cluster, 
"SyncComplete:" + epoch, (node, from, onDone) -> {
+                                                    .flatMap(v -> 
MessageTask.apply(originator, cluster, executor, "SyncComplete:" + epoch, 
(node, from, onDone) -> {
                     node.onEpochSyncComplete(originator.id(), epoch);
                     onDone.accept(true);
                 })).beginAsResult());
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 0d6b43a9..021adf44 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -23,6 +23,7 @@ import accord.impl.mock.MockCluster;
 import accord.api.Result;
 import accord.impl.mock.MockStore;
 import accord.primitives.*;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java 
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 06d66817..a76caa1c 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -22,8 +22,6 @@ import accord.impl.mock.MockCluster;
 import accord.impl.mock.MockConfigurationService;
 import accord.local.Command;
 import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.Status;
 import accord.primitives.Range;
 import accord.topology.Topology;
 import accord.primitives.Keys;
diff --git 
a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java 
b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
index 35418b58..d037cd75 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -20,7 +20,6 @@ package accord.coordinate.tracking;
 
 import accord.impl.TopologyUtils;
 import accord.local.Node.Id;
-import accord.local.Node;
 import accord.primitives.Ranges;
 import accord.topology.Shard;
 import accord.topology.Topologies;
diff --git 
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java 
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
index 3736dd8f..fc7038fe 100644
--- 
a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
+++ 
b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -19,6 +19,10 @@
 package accord.coordinate.tracking;
 
 import accord.burn.TopologyUpdates;
+import accord.impl.TestAgent;
+import accord.impl.basic.RandomDelayQueue;
+import accord.impl.basic.SimulatedDelayedExecutorService;
+import accord.local.AgentExecutor;
 import accord.utils.DefaultRandom;
 import accord.utils.RandomSource;
 import accord.impl.IntHashKey;
@@ -96,13 +100,14 @@ public abstract class TrackerReconciler<ST extends 
ShardTracker, T extends Abstr
     {
         System.out.println("seed: " + seed);
         RandomSource random = new DefaultRandom(seed);
-        return topologies(random).map(topologies -> constructor.apply(random, 
topologies))
+        SimulatedDelayedExecutorService executor = new 
SimulatedDelayedExecutorService(new RandomDelayQueue.Factory(random).get(), new 
TestAgent(), random);
+        return topologies(random, executor).map(topologies -> 
constructor.apply(random, topologies))
                 .collect(Collectors.toList());
     }
 
     // TODO (required, testing): generalise and parameterise topology 
generation a bit more
     //                           also, select a subset of the generated 
topologies to correctly simulate topology consumption logic
-    private static Stream<Topologies> topologies(RandomSource random)
+    private static Stream<Topologies> topologies(RandomSource random, 
AgentExecutor executor)
     {
         TopologyFactory factory = new TopologyFactory(2 + random.nextInt(3), 
IntHashKey.ranges(4 + random.nextInt(12)));
         List<Id> nodes = cluster(factory.rf * (1 + 
random.nextInt(factory.shardRanges.length - 1)));
@@ -117,7 +122,7 @@ public abstract class TrackerReconciler<ST extends 
ShardTracker, T extends Abstr
 
         Deque<Topology> topologies = new ArrayDeque<>();
         topologies.add(topology);
-        TopologyUpdates topologyUpdates = new TopologyUpdates();
+        TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
         TopologyRandomizer configRandomizer = new TopologyRandomizer(() -> 
random, topology, topologyUpdates, (id, top) -> {});
         while (--count > 0)
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 95b00b9a..6d549d45 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -38,13 +38,13 @@ import accord.api.MessageSink;
 import accord.burn.BurnTestConfigurationService;
 import accord.burn.TopologyUpdates;
 import accord.impl.*;
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
-import accord.impl.list.ListAgent;
 import accord.impl.list.ListStore;
 import accord.local.ShardDistributor;
-import accord.messages.Callback;
+import accord.messages.SafeCallback;
 import accord.messages.Reply;
 import accord.messages.Request;
 import accord.topology.TopologyRandomizer;
@@ -150,24 +150,12 @@ public class Cluster implements Scheduler
             if (deliver.message instanceof Reply)
             {
                 Reply reply = (Reply) deliver.message;
-                Callback callback = reply.isFinal()
-                        ? 
sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
-                        : 
sinks.get(deliver.dst).callbacks.get(deliver.replyId);
+                SafeCallback callback = reply.isFinal()
+                                        ? 
sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
+                                        : 
sinks.get(deliver.dst).callbacks.get(deliver.replyId);
 
                 if (callback != null)
-                {
-                    on.scheduler().now(() -> {
-                        try
-                        {
-                            callback.onSuccess(deliver.src, reply);
-                        }
-                        catch (Throwable t)
-                        {
-                            callback.onCallbackFailure(deliver.src, t);
-                            on.agent().onUncaughtException(t);
-                        }
-                    });
-                }
+                    callback.success(deliver.src, reply);
             }
             else on.receive((Request) deliver.message, deliver.src, deliver);
         }
@@ -206,24 +194,25 @@ public class Cluster implements Scheduler
         run.run();
     }
 
-    public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier, 
Consumer<Packet> responseSink, Consumer<Throwable> onFailure, 
Supplier<RandomSource> randomSupplier, Supplier<LongSupplier> nowSupplier, 
TopologyFactory topologyFactory, Supplier<Packet> in)
+    public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier, 
Consumer<Packet> responseSink, AgentExecutor executor, Supplier<RandomSource> 
randomSupplier, Supplier<LongSupplier> nowSupplier, TopologyFactory 
topologyFactory, Supplier<Packet> in)
     {
-        TopologyUpdates topologyUpdates = new TopologyUpdates();
+
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> lookup = new LinkedHashMap<>();
-        TopologyRandomizer configRandomizer = new 
TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
         try
         {
             Cluster sinks = new Cluster(queueSupplier, lookup::get, 
responseSink);
+            TopologyUpdates topologyUpdates = new TopologyUpdates(executor);
+            TopologyRandomizer configRandomizer = new 
TopologyRandomizer(randomSupplier, topology, topologyUpdates, lookup::get);
             for (Id node : nodes)
             {
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
-                BurnTestConfigurationService configService = new 
BurnTestConfigurationService(node, messageSink, randomSupplier, topology, 
lookup::get, topologyUpdates);
+                BurnTestConfigurationService configService = new 
BurnTestConfigurationService(node, executor, randomSupplier, topology, 
lookup::get, topologyUpdates);
                 lookup.put(node, new Node(node, messageSink, configService, 
nowSupplier.get(),
                                           () -> new ListStore(node), new 
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
-                                          new ListAgent(30L, onFailure),
+                                          executor.agent(),
                                           randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER,
-                                          SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending, randomSupplier.get())));
+                                          SimpleProgressLog::new, 
DelayedCommandStores.factory(sinks.pending)));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 8e84f8e4..72153c24 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -18,26 +18,106 @@
 
 package accord.impl.basic;
 
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.impl.InMemoryCommandStore;
 import accord.impl.InMemoryCommandStores;
+import accord.impl.basic.TaskExecutorService.Task;
+import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
 import accord.local.ShardDistributor;
 import accord.utils.RandomSource;
+import accord.utils.async.AsyncChain;
 
 public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
 {
-    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, SimulatedDelayedExecutorService executorService)
+    private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService 
executorService)
+    {
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, DelayedCommandStore.factory(executorService));
+    }
+
+    public static CommandStores.Factory factory(PendingQueue pending)
     {
-        super(time, agent, store, shardDistributor, progressLogFactory, 
InMemoryCommandStore.SingleThread.factory(executorService));
+        return (time, agent, store, random, shardDistributor, 
progressLogFactory) ->
+               new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, new 
SimulatedDelayedExecutorService(pending, agent, random));
     }
 
-    public static CommandStores.Factory factory(PendingQueue pending, 
RandomSource random)
+    public static class DelayedCommandStore extends InMemoryCommandStore
     {
-        SimulatedDelayedExecutorService executorService = new 
SimulatedDelayedExecutorService(pending, random);
-        return (time, agent, store, shardDistributor, progressLogFactory) -> 
new DelayedCommandStores(time, agent, store, shardDistributor, 
progressLogFactory, executorService);
+        private final SimulatedDelayedExecutorService executor;
+        private final Queue<Task<?>> pending = new LinkedList<>();
+
+        public DelayedCommandStore(int id, NodeTimeService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, RangesForEpochHolder 
rangesForEpochHolder, SimulatedDelayedExecutorService executor)
+        {
+            super(id, time, agent, store, progressLogFactory, 
rangesForEpochHolder);
+            this.executor = executor;
+        }
+
+        private static CommandStore.Factory 
factory(SimulatedDelayedExecutorService executor)
+        {
+            return (id, time, agent, store, progressLogFactory, 
rangesForEpoch) -> new DelayedCommandStore(id, time, agent, store, 
progressLogFactory, rangesForEpoch, executor);
+        }
+
+        @Override
+        public boolean inStore()
+        {
+            return CommandStore.maybeCurrent() == this;
+        }
+
+        @Override
+        public AsyncChain<Void> execute(PreLoadContext context, Consumer<? 
super SafeCommandStore> consumer)
+        {
+            return submit(context, i -> { consumer.accept(i); return null; });
+        }
+
+        @Override
+        public <T> AsyncChain<T> submit(PreLoadContext context, Function<? 
super SafeCommandStore, T> function)
+        {
+            return submit(() -> executeInContext(this, context, function));
+        }
+
+        @Override
+        public <T> AsyncChain<T> submit(Callable<T> fn)
+        {
+            Task<T> task = new Task<>(() -> this.unsafeRunIn(fn));
+            boolean wasEmpty = pending.isEmpty();
+            pending.add(task);
+            if (wasEmpty)
+                runNextTask();
+            return task;
+        }
+
+        private void runNextTask()
+        {
+            Task<?> next = pending.peek();
+            if (next == null)
+                return;
+
+            next.addCallback(agent()); // used to track unexpected exceptions 
and notify simulations
+            next.addCallback(this::afterExecution);
+            executor.execute(next);
+        }
+
+        private void afterExecution()
+        {
+            pending.poll();
+            runNextTask();
+        }
+
+        @Override
+        public void shutdown()
+        {
+
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java 
b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index bad4ed09..d0d70402 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -23,8 +23,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import accord.local.AgentExecutor;
+import accord.messages.SafeCallback;
 import accord.utils.RandomSource;
-import accord.coordinate.Timeout;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
@@ -43,7 +44,7 @@ public class NodeSink implements MessageSink
     final RandomSource random;
 
     int nextMessageId = 0;
-    Map<Long, Callback> callbacks = new LinkedHashMap<>();
+    Map<Long, SafeCallback> callbacks = new LinkedHashMap<>();
 
     public NodeSink(Id self, Function<Id, Node> lookup, Cluster parent, 
RandomSource random)
     {
@@ -60,39 +61,19 @@ public class NodeSink implements MessageSink
     }
 
     @Override
-    public void send(Id to, Request send, Callback callback)
+    public void send(Id to, Request send, AgentExecutor executor, Callback 
callback)
     {
         long messageId = nextMessageId++;
-        callbacks.put(messageId, callback);
+        SafeCallback sc = new SafeCallback(executor, callback);
+        callbacks.put(messageId, sc);
         parent.add(self, to, messageId, send);
         parent.pending.add((PendingRunnable) () -> {
-            if (callback == callbacks.get(messageId))
-            {
-                try
-                {
-                    callback.onSlowResponse(to);
-                }
-                catch (Throwable t)
-                {
-                    callback.onCallbackFailure(to, t);
-                    lookup.apply(self).agent().onUncaughtException(t);
-                }
-
-            }
+            if (sc == callbacks.get(messageId))
+                sc.slowResponse(to);
         }, 100 + random.nextInt(200), TimeUnit.MILLISECONDS);
         parent.pending.add((PendingRunnable) () -> {
-            if (callback == callbacks.remove(messageId))
-            {
-                try
-                {
-                    callback.onFailure(to, new Timeout(null, null));
-                }
-                catch (Throwable t)
-                {
-                    callback.onCallbackFailure(to, t);
-                    lookup.apply(self).agent().onUncaughtException(t);
-                }
-            }
+            if (sc == callbacks.remove(messageId))
+                sc.timeout(to);
         }, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
     }
 
diff --git 
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
 
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
index 850f717d..e21cc779 100644
--- 
a/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
+++ 
b/accord-core/src/test/java/accord/impl/basic/SimulatedDelayedExecutorService.java
@@ -18,9 +18,9 @@
 
 package accord.impl.basic;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import accord.api.Agent;
 import accord.burn.random.FrequentLargeRange;
 import accord.burn.random.RandomLong;
 import accord.burn.random.RandomWalkRange;
@@ -29,12 +29,14 @@ import accord.utils.RandomSource;
 public class SimulatedDelayedExecutorService extends TaskExecutorService
 {
     private final PendingQueue pending;
+    private final Agent agent;
     private final RandomSource random;
     private final RandomLong jitterInNano;
 
-    public SimulatedDelayedExecutorService(PendingQueue pending, RandomSource 
random)
+    public SimulatedDelayedExecutorService(PendingQueue pending, Agent agent, 
RandomSource random)
     {
         this.pending = pending;
+        this.agent = agent;
         this.random = random;
         // this is different from Apache Cassandra Simulator as this is 
computed differently for each executor
         // rather than being a global config
@@ -60,10 +62,9 @@ public class SimulatedDelayedExecutorService extends 
TaskExecutorService
         pending.add(task, jitterInNano.getLong(random), TimeUnit.NANOSECONDS);
     }
 
-    public <T> Task<T> submit(Callable<T> fn, long delay, TimeUnit unit)
+    @Override
+    public Agent agent()
     {
-        Task<T> task = newTaskFor(fn);
-        pending.add(task, jitterInNano.getLong(random) + unit.toNanos(delay), 
TimeUnit.NANOSECONDS);
-        return task;
+        return agent;
     }
 }
\ No newline at end of file
diff --git 
a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java 
b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
index 2ab766bb..c68bdff8 100644
--- a/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
+++ b/accord-core/src/test/java/accord/impl/basic/TaskExecutorService.java
@@ -27,10 +27,10 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import accord.utils.async.AsyncExecutor;
+import accord.local.AgentExecutor;
 import accord.utils.async.AsyncResults;
 
-public abstract class TaskExecutorService extends AbstractExecutorService 
implements AsyncExecutor
+public abstract class TaskExecutorService extends AbstractExecutorService 
implements AgentExecutor
 {
     public static class Task<T> extends AsyncResults.SettableResult<T> 
implements Pending, RunnableFuture<T>
     {
diff --git 
a/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java 
b/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
deleted file mode 100644
index 45d55c30..00000000
--- a/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.impl.basic;
-
-import accord.utils.RandomSource;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public class UniformRandomQueue<T> implements PendingQueue
-{
-    public static class Factory implements Supplier<PendingQueue>
-    {
-        final RandomSource seeds;
-
-        public Factory(RandomSource seeds)
-        {
-            this.seeds = seeds;
-        }
-
-        @Override
-        public PendingQueue get()
-        {
-            return new UniformRandomQueue<>(seeds.fork());
-        }
-    }
-
-    static class Item implements Comparable<Item>
-    {
-        final double priority;
-        final Pending value;
-
-        Item(double priority, Pending value)
-        {
-            this.priority = priority;
-            this.value = value;
-        }
-
-        @Override
-        public int compareTo(Item that)
-        {
-            return Double.compare(this.priority, that.priority);
-        }
-    }
-
-    final PriorityQueue<Item> queue = new PriorityQueue<>();
-    final RandomSource random;
-
-    public UniformRandomQueue(RandomSource random)
-    {
-        this.random = random;
-    }
-
-    @Override
-    public int size()
-    {
-        return queue.size();
-    }
-
-    @Override
-    public void add(Pending item)
-    {
-        queue.add(new Item(random.nextDouble(), item));
-    }
-
-    @Override
-    public void add(Pending item, long delay, TimeUnit units)
-    {
-        queue.add(new Item(random.nextDouble(), item));
-    }
-
-    @Override
-    public Pending poll()
-    {
-        return unwrap(queue.poll());
-    }
-
-    private static Pending unwrap(Item e)
-    {
-        return e == null ? null : e.value;
-    }
-}
diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java 
b/accord-core/src/test/java/accord/impl/list/ListData.java
index cc582276..26874ac7 100644
--- a/accord-core/src/test/java/accord/impl/list/ListData.java
+++ b/accord-core/src/test/java/accord/impl/list/ListData.java
@@ -19,7 +19,6 @@
 package accord.impl.list;
 
 import java.util.Arrays;
-import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index f2a9883c..8fbe8941 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -111,21 +111,23 @@ public class ListRequest implements Request
                 ((Cluster)node.scheduler()).onDone(() -> {
                     RoutingKey homeKey = ((CoordinateFailed) fail).homeKey();
                     TxnId txnId = ((CoordinateFailed) fail).txnId();
-                    CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) 
-> {
-                        if (f != null)
-                            return;
-                        switch (s)
-                        {
-                            case Invalidated:
-                                node.reply(client, replyContext, new 
ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null, 
null));
-                                break;
-                            case Lost:
-                                node.reply(client, replyContext, new 
ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new 
int[0][], null));
-                                break;
-                            case Neither:
-                                // currently caught elsewhere in response 
tracking, but might help to throw an exception here
-                        }
-                    });
+                    node.commandStores()
+                        .select(homeKey)
+                        .execute(() -> CheckOnResult.checkOnResult(node, 
txnId, homeKey, (s, f) -> {
+                            if (f != null)
+                                return;
+                            switch (s)
+                            {
+                                case Invalidated:
+                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, null, 
null));
+                                    break;
+                                case Lost:
+                                    node.reply(client, replyContext, new 
ListResult(client, ((Packet) replyContext).requestId, txnId, null, null, new 
int[0][], null));
+                                    break;
+                                case Neither:
+                                    // currently caught elsewhere in response 
tracking, but might help to throw an exception here
+                            }
+                        }));
                 });
             }
         }
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index cf79d139..b2986e58 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -20,7 +20,6 @@ package accord.impl.list;
 
 import java.util.*;
 import java.util.AbstractMap.SimpleEntry;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import accord.api.Key;
@@ -28,7 +27,6 @@ import accord.local.Node;
 import accord.api.DataStore;
 import accord.primitives.Range;
 import accord.primitives.RoutableKey;
-import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.utils.Timestamped;
 
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 2a888642..d98c2dde 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -20,11 +20,12 @@ package accord.impl.mock;
 
 import accord.NetworkFilter;
 import accord.api.MessageSink;
-import accord.coordinate.Timeout;
 import accord.impl.*;
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.ShardDistributor;
+import accord.messages.SafeCallback;
 import accord.primitives.Ranges;
 import accord.utils.DefaultRandom;
 import accord.utils.EpochFunction;
@@ -63,7 +64,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
     public NetworkFilter networkFilter = new NetworkFilter();
 
     private long nextMessageId = 0;
-    Map<Long, Callback> callbacks = new ConcurrentHashMap<>();
+    Map<Long, SafeCallback> callbacks = new ConcurrentHashMap<>();
     private final EpochFunction<MockConfigurationService> onFetchTopology;
 
     private MockCluster(Builder builder)
@@ -140,7 +141,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
     }
 
     @Override
-    public void send(Id from, Id to, Request request, Callback callback)
+    public void send(Id from, Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
         Node node = nodes.get(to);
         if (node == null)
@@ -153,7 +154,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         {
             // TODO (desired, testing): more flexible timeouts
             if (callback != null)
-                callback.onFailure(to, new Timeout(null, null));
+                new SafeCallback(executor, callback).timeout(to);
             logger.info("discarding filtered message from {} to {}: {}", from, 
to, request);
             return;
         }
@@ -161,10 +162,11 @@ public class MockCluster implements Network, 
AutoCloseable, Iterable<Node>
         long messageId = nextMessageId();
         if (callback != null)
         {
-            callbacks.put(messageId, callback);
+            SafeCallback sc = new SafeCallback(executor, callback);
+            callbacks.put(messageId, sc);
             node.scheduler().once(() -> {
-                if (callbacks.remove(messageId, callback))
-                    callback.onFailure(to, new Timeout(null, null));
+                if (callbacks.remove(messageId, sc))
+                    sc.timeout(to);
                 }, 2L, TimeUnit.SECONDS);
         }
 
@@ -182,17 +184,17 @@ public class MockCluster implements Network, 
AutoCloseable, Iterable<Node>
             return;
         }
 
-        Callback callback = callbacks.remove(replyingToMessage);
+        SafeCallback sc = callbacks.remove(replyingToMessage);
 
         if (networkFilter.shouldDiscard(from, replyingToNode, reply))
         {
             logger.info("discarding filtered reply from {} to {}: {}", from, 
reply, reply);
-            if (callback != null)
-                callback.onFailure(from, new Timeout(null, null));
+            if (sc != null)
+                sc.timeout(from);
             return;
         }
 
-        if (callback == null)
+        if (sc == null)
         {
             logger.warn("Callback not found for reply from {} to {}: {} 
(msgid: {})", from, replyingToNode, reply, replyingToMessage);
             return;
@@ -202,11 +204,11 @@ public class MockCluster implements Network, 
AutoCloseable, Iterable<Node>
         node.scheduler().now(() -> {
             try
             {
-                callback.onSuccess(from, reply);
+                sc.success(from, reply);
             }
             catch (Throwable t)
             {
-                callback.onCallbackFailure(from, t);
+                sc.failure(from, t);
             }
         });
     }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java 
b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index 66e91e18..0a61b415 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -28,10 +28,8 @@ import accord.api.Write;
 import accord.local.SafeCommandStore;
 import accord.primitives.*;
 import accord.primitives.Ranges;
-import accord.primitives.Keys;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
-import accord.primitives.*;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
diff --git a/accord-core/src/test/java/accord/impl/mock/Network.java 
b/accord-core/src/test/java/accord/impl/mock/Network.java
index 4f64acc5..afa5e541 100644
--- a/accord-core/src/test/java/accord/impl/mock/Network.java
+++ b/accord-core/src/test/java/accord/impl/mock/Network.java
@@ -18,6 +18,7 @@
 
 package accord.impl.mock;
 
+import accord.local.AgentExecutor;
 import accord.local.Node.Id;
 import accord.messages.Callback;
 import accord.messages.Reply;
@@ -46,13 +47,13 @@ public interface Network
         return new MessageId(messageId);
     }
 
-    void send(Id from, Id to, Request request, Callback callback);
+    void send(Id from, Id to, Request request, AgentExecutor executor, 
Callback callback);
     void reply(Id from, Id replyingToNode, long replyingToMessage, Reply 
reply);
 
     Network BLACK_HOLE = new Network()
     {
         @Override
-        public void send(Id from, Id to, Request request, Callback callback)
+        public void send(Id from, Id to, Request request, AgentExecutor 
executor, Callback callback)
         {
             // TODO (easy, testing): log
         }
diff --git 
a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java 
b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
index c6cfb90e..ef00df85 100644
--- a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
@@ -18,6 +18,7 @@
 
 package accord.impl.mock;
 
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.messages.Callback;
 import accord.messages.Reply;
@@ -62,10 +63,10 @@ public class RecordingMessageSink extends SimpleMessageSink
     }
 
     @Override
-    public void send(Node.Id to, Request request, Callback callback)
+    public void send(Node.Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
         requests.add(new Envelope<>(to, request, callback));
-        super.send(to, request, callback);
+        super.send(to, request, executor, callback);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java 
b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
index cdf9dc3d..e33314d2 100644
--- a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -18,6 +18,7 @@
 
 package accord.impl.mock;
 
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.api.MessageSink;
 import accord.messages.Callback;
@@ -39,13 +40,13 @@ public class SimpleMessageSink implements MessageSink
     @Override
     public void send(Node.Id to, Request request)
     {
-        network.send(node, to, request, null);
+        network.send(node, to, request, null, null);
     }
 
     @Override
-    public void send(Node.Id to, Request request, Callback callback)
+    public void send(Node.Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
-        network.send(node, to, request, callback);
+        network.send(node, to, request, executor, callback);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java 
b/accord-core/src/test/java/accord/utils/MessageTask.java
index d868049c..d9084d10 100644
--- a/accord-core/src/test/java/accord/utils/MessageTask.java
+++ b/accord-core/src/test/java/accord/utils/MessageTask.java
@@ -18,6 +18,7 @@
 
 package accord.utils;
 
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.messages.*;
 import accord.utils.async.AsyncResults;
@@ -70,6 +71,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
     private final List<Node.Id> recipients;
     private final String desc;
     private final Request request;
+    private final AgentExecutor executor;
     private final RetryingCallback callback;
 
     private class TaskRequest implements Request
@@ -117,7 +119,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
             Invariants.checkArgument(reply == SUCCESS || reply == FAILURE);
             if (reply == FAILURE)
             {
-                originator.send(from, request, this);
+                originator.send(from, request, executor, this);
                 return;
             }
 
@@ -132,7 +134,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
         @Override
         public void onFailure(Node.Id from, Throwable failure)
         {
-            originator.send(from, request, this);
+            originator.send(from, request, executor, this);
         }
 
         @Override
@@ -144,7 +146,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
 
     private MessageTask(Node originator,
                         List<Node.Id> recipients,
-                        String desc, NodeProcess process)
+                        AgentExecutor executor, String desc, NodeProcess 
process)
     {
         Invariants.checkArgument(!recipients.isEmpty());
         this.originator = originator;
@@ -152,31 +154,30 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
         this.desc = desc;
         this.request = new TaskRequest(process, desc);
         this.callback = new RetryingCallback(recipients);
+        this.executor = executor;
     }
 
-    public static MessageTask of(Node originator, Collection<Node.Id> 
recipients, String desc, NodeProcess process)
+    private static MessageTask of(Node originator, Collection<Node.Id> 
recipients, AgentExecutor executor, String desc, NodeProcess process)
     {
-        return new MessageTask(originator, new ArrayList<>(recipients), desc, 
process);
+        return new MessageTask(originator, new ArrayList<>(recipients), 
executor, desc, process);
     }
 
-    public static MessageTask begin(Node originator, Collection<Node.Id> 
recipients, String desc, NodeProcess process)
+    public static MessageTask begin(Node originator, Collection<Node.Id> 
recipients, AgentExecutor executor, String desc, NodeProcess process)
     {
-        MessageTask task = of(originator, recipients, desc, process);
+        MessageTask task = of(originator, recipients, executor, desc, process);
         task.run();
         return task;
     }
 
-    public static MessageTask of(Node originator, Collection<Node.Id> 
recipients, String desc, BiConsumer<Node, Consumer<Boolean>> consumer)
+    public static MessageTask of(Node originator, Collection<Node.Id> 
recipients, AgentExecutor executor, String desc, BiConsumer<Node, 
Consumer<Boolean>> consumer)
     {
-        NodeProcess process = (node, from, onDone) -> {
-            consumer.accept(node, onDone);
-        };
-        return of(originator, recipients, desc, process);
+        NodeProcess process = (node, from, onDone) -> consumer.accept(node, 
onDone);
+        return of(originator, recipients, executor, desc, process);
     }
 
-    public static MessageTask apply(Node originator, Collection<Node.Id> 
recipients, String desc, NodeProcess process)
+    public static MessageTask apply(Node originator, Collection<Node.Id> 
recipients, AgentExecutor executor, String desc, NodeProcess process)
     {
-        MessageTask task = of(originator, recipients, desc, process);
+        MessageTask task = of(originator, recipients, executor, desc, process);
         task.run();
         return task;
     }
@@ -184,7 +185,7 @@ public class MessageTask extends 
AsyncResults.SettableResult<Void> implements Ru
     @Override
     public void run()
     {
-        originator.send(recipients, request, callback);
+        originator.send(recipients, request, executor, callback);
     }
 
     @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 74de7eb1..31799dff 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -39,15 +39,16 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
-import accord.coordinate.Timeout;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.SimpleProgressLog;
 import accord.impl.InMemoryCommandStores;
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
 import accord.local.ShardDistributor;
 import accord.messages.Callback;
+import accord.messages.SafeCallback;
 import accord.messages.Reply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
@@ -79,7 +80,7 @@ public class Cluster implements Scheduler
         final RandomSource random;
 
         int nextMessageId = 0;
-        Map<Long, Callback> callbacks = new LinkedHashMap<>();
+        Map<Long, SafeCallback> callbacks = new LinkedHashMap<>();
 
         public InstanceSink(Id self, Function<Id, Node> lookup, Cluster 
parent, RandomSource random)
         {
@@ -96,14 +97,15 @@ public class Cluster implements Scheduler
         }
 
         @Override
-        public void send(Id to, Request send, Callback callback)
+        public void send(Id to, Request send, AgentExecutor executor, Callback 
callback)
         {
             long messageId = nextMessageId++;
-            callbacks.put(messageId, callback);
+            SafeCallback sc = new SafeCallback(executor, callback);
+            callbacks.put(messageId, sc);
             parent.add(self, to, messageId, send);
             parent.pending.add((Runnable)() -> {
-                if (callback == callbacks.remove(messageId))
-                    callback.onFailure(to, new Timeout(null, null));
+                if (sc == callbacks.remove(messageId))
+                    sc.timeout(to);
             }, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
         }
 
@@ -201,18 +203,9 @@ public class Cluster implements Scheduler
                     if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID || 
body instanceof Reply)
                     {
                         Reply reply = (Reply) body;
-                        Callback callback = 
sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
+                        SafeCallback callback = 
sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
                         if (callback != null)
-                            on.scheduler().now(() -> {
-                                try
-                                {
-                                    callback.onSuccess(deliver.src, reply);
-                                }
-                                catch (Throwable t)
-                                {
-                                    callback.onCallbackFailure(deliver.src, t);
-                                }
-                            });
+                            callback.success(deliver.src, reply);
                     }
                     else on.receive((Request) body, deliver.src, deliver);
             }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index 63b757c3..b32a6a7c 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -477,12 +477,15 @@ public class Json
         public void write(JsonWriter out, ReadOk value) throws IOException
         {
             out.beginArray();
-            for (Map.Entry<Key, Value> e : 
((MaelstromData)value.data).entrySet())
+            if (value.data != null)
             {
-                out.beginArray();
-                ((MaelstromKey)e.getKey()).datum.write(out);
-                e.getValue().write(out);
-                out.endArray();
+                for (Map.Entry<Key, Value> e : 
((MaelstromData)value.data).entrySet())
+                {
+                    out.beginArray();
+                    ((MaelstromKey)e.getKey()).datum.write(out);
+                    e.getValue().write(out);
+                    out.endArray();
+                }
             }
             out.endArray();
         }
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
index 34939ddc..418deff4 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
@@ -28,7 +28,6 @@ import accord.api.Data;
 import accord.api.Key;
 import accord.api.Query;
 import accord.api.Result;
-import accord.primitives.Keys;
 import accord.primitives.TxnId;
 
 public class MaelstromQuery implements Query
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 955645c8..1df8f9dc 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -34,6 +34,7 @@ import accord.coordinate.Timeout;
 import accord.impl.SimpleProgressLog;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.SizeOfIntersectionSorter;
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -112,8 +113,9 @@ public class Main
         }
 
         @Override
-        public void send(Id to, Request send, Callback callback)
+        public void send(Id to, Request send, AgentExecutor ignored, Callback 
callback)
         {
+            // Executor is ignored due to the fact callbacks are applied in a 
single thread already
             long messageId = nextMessageId.incrementAndGet();
             callbacks.put(messageId, new CallbackInfo(callback, to, 
nowSupplier.getAsLong() + 1000L));
             send(new Packet(self, to, messageId, send));
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle 
b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index 5817cc9c..34b89a46 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -18,6 +18,7 @@
 
 plugins {
     id 'java'
+    id 'checkstyle'
 }
 
 group   accord_group
@@ -33,6 +34,11 @@ compileJava {
     dependsOn(':rat')
 }
 
+checkstyle {
+  showViolations = true
+  configDirectory = file("${rootProject.projectDir}/.build/checkstyle")
+}
+
 test {
     useJUnitPlatform()
     // Use max(cpu/2, 1) workers to run tests


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to