This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new d55c9f6cbd CEP-15: (C*) Accord message processing should avoid being
passed on to a Stage and run directly in the messageing handler
d55c9f6cbd is described below
commit d55c9f6cbdc77bacd59e5e51a830aab8758406ef
Author: David Capwell <[email protected]>
AuthorDate: Thu Apr 27 11:30:50 2023 -0700
CEP-15: (C*) Accord message processing should avoid being passed on to a
Stage and run directly in the messageing handler
patch by David Capwell; reviewed by Ariel Weisberg, Benedict Elliott Smith
for CASSANDRA-18364
---
.build/build-accord.xml | 4 +-
.../pre-commit/100-verify-submodules-pushed.sh | 1 -
modules/accord | 2 +-
.../org/apache/cassandra/concurrent/Stage.java | 1 -
src/java/org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 30 ++++-----
.../cassandra/config/OptionaldPositiveInt.java | 73 ++++++++++++++++++++++
.../cassandra/config/YamlConfigurationLoader.java | 26 +++++---
src/java/org/apache/cassandra/net/Verb.java | 30 ++++-----
.../cassandra/service/accord/AccordCallback.java | 21 ++++---
.../service/accord/AccordCommandStore.java | 33 +++-------
.../service/accord/AccordCommandStores.java | 5 +-
.../service/accord/AccordMessageSink.java | 23 ++++---
.../cassandra/service/accord/AccordService.java | 8 +--
.../service/accord/async/AsyncOperation.java | 2 +-
.../accord/serializers/ReadDataSerializers.java | 6 +-
.../cassandra/service/accord/txn/TxnData.java | 3 +
.../org/apache/cassandra/utils/FBUtilities.java | 9 +--
.../cassandra/simulator/ClusterSimulation.java | 7 +--
.../AbstractPairOfSequencesPaxosSimulation.java | 4 +-
.../cassandra/simulator/paxos/PaxosSimulation.java | 32 ++++++++--
.../simulator/systems/SimulatedAction.java | 5 +-
.../config/DatabaseDescriptorRefTest.java | 1 +
.../config/YamlConfigurationLoaderTest.java | 51 +++++++++++++++
.../service/accord/AccordMessageSinkTest.java | 3 +-
25 files changed, 264 insertions(+), 120 deletions(-)
diff --git a/.build/build-accord.xml b/.build/build-accord.xml
index eba85912d5..6fc716d2d0 100644
--- a/.build/build-accord.xml
+++ b/.build/build-accord.xml
@@ -27,8 +27,10 @@
<arg value="publishToMavenLocal" />
<arg value="-x" />
<arg value="test" />
- <arg value="-x" />
+ <!-- since so much development is done from this hook, by adding
checkstyle and rat will avoid issues earlier -->
<arg value="rat" />
+ <arg value="checkstyleMain" />
+ <arg value="checkstyleTest" />
<arg value="-Paccord_group=org.apache.cassandra" />
<arg value="-Paccord_artifactId=cassandra-accord" />
<arg value="-Paccord_version=${version}" />
diff --git a/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
b/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
index c54099ac0f..aee8f658a1 100755
--- a/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
+++ b/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
@@ -84,7 +84,6 @@ _main() {
_log "\t\tgit config --local
cassandra.pre-commit.verify-submodules.enabled false"
_log "\tOr"
_log "\t\tgit config --local
cassandra.pre-commit.verify-submodule-${file}.enabled false"
- set -x
git_sub_dir="${file}/.git"
branch="$(git config -f .gitmodules "submodule.${file}.branch")"
[[ -z "${branch:-}" ]] && error "Submodule ${file} does not define a
branch"
diff --git a/modules/accord b/modules/accord
index 08aaab6e33..8226b2d775 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 08aaab6e33d43406e0649146144e4df67648602a
+Subproject commit 8226b2d7759319d7a0b0c823ab09b4344c5423f7
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java
b/src/java/org/apache/cassandra/concurrent/Stage.java
index 4a7552ab24..992c0c54f0 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -46,7 +46,6 @@ public enum Stage
MUTATION (true, "MutationStage", "request",
DatabaseDescriptor::getConcurrentWriters,
DatabaseDescriptor::setConcurrentWriters,
Stage::multiThreadedLowSignalStage),
COUNTER_MUTATION (true, "CounterMutationStage", "request",
DatabaseDescriptor::getConcurrentCounterWriters,
DatabaseDescriptor::setConcurrentCounterWriters,
Stage::multiThreadedLowSignalStage),
VIEW_MUTATION (true, "ViewMutationStage", "request",
DatabaseDescriptor::getConcurrentViewWriters,
DatabaseDescriptor::setConcurrentViewWriters,
Stage::multiThreadedLowSignalStage),
- ACCORD (true, "AccordStage", "request",
DatabaseDescriptor::getConcurrentAccordOps,
DatabaseDescriptor::setConcurrentAccordOps,
Stage::multiThreadedLowSignalStage),
GOSSIP (true, "GossipStage", "internal", () -> 1,
null,
Stage::singleThreadedStage),
REQUEST_RESPONSE (false, "RequestResponseStage", "request",
FBUtilities::getAvailableProcessors, null,
Stage::multiThreadedLowSignalStage),
ANTI_ENTROPY (false, "AntiEntropyStage", "internal", () -> 1,
null,
Stage::singleThreadedStage),
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index fbba2facf3..b674ceb92a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -171,10 +171,9 @@ public class Config
public int concurrent_reads = 32;
public int concurrent_writes = 32;
- public int concurrent_accord_operations = 32;
public int concurrent_counter_writes = 32;
public int concurrent_materialized_view_writes = 32;
- public int available_processors = -1;
+ public OptionaldPositiveInt available_processors = new
OptionaldPositiveInt(Integer.getInteger("cassandra.available_processors",
OptionaldPositiveInt.UNDEFINED_VALUE));
@Deprecated
public Integer concurrent_replicates = null;
@@ -596,6 +595,7 @@ public class Config
public volatile boolean use_statements_enabled = true;
public boolean accord_transactions_enabled = false;
+ public OptionaldPositiveInt accord_shard_count =
OptionaldPositiveInt.UNDEFINED;
/**
* Optionally disable asynchronous UDF execution.
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a48b034568..ee8d12b236 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -519,9 +519,6 @@ public class DatabaseDescriptor
if (conf.concurrent_counter_writes < 2)
throw new ConfigurationException("concurrent_counter_writes must
be at least 2, but was " + conf.concurrent_counter_writes, false);
- if (conf.concurrent_accord_operations < 1)
- throw new ConfigurationException("concurrent_accord_operations
must be at least 1, but was " + conf.concurrent_accord_operations, false);
-
if (conf.concurrent_replicates != null)
logger.warn("concurrent_replicates has been deprecated and should
be removed from cassandra.yaml");
@@ -2092,20 +2089,6 @@ public class DatabaseDescriptor
conf.concurrent_materialized_view_writes =
concurrent_materialized_view_writes;
}
- public static int getConcurrentAccordOps()
- {
- return conf.concurrent_accord_operations;
- }
-
- public static void setConcurrentAccordOps(int concurrent_operations)
- {
- if (concurrent_operations < 0)
- {
- throw new IllegalArgumentException("Concurrent accord operations
must be non-negative");
- }
- conf.concurrent_accord_operations = concurrent_operations;
- }
-
public static int getFlushWriters()
{
return conf.memtable_flush_writers;
@@ -2113,7 +2096,13 @@ public class DatabaseDescriptor
public static int getAvailableProcessors()
{
- return conf == null ? -1 : conf.available_processors;
+ OptionaldPositiveInt ap = conf == null ?
OptionaldPositiveInt.UNDEFINED : conf.available_processors;
+ return ap.or(Runtime.getRuntime()::availableProcessors);
+ }
+
+ public static void setAvailableProcessors(int value)
+ {
+ conf.available_processors = new OptionaldPositiveInt(value);
}
public static int getConcurrentCompactors()
@@ -4492,6 +4481,11 @@ public class DatabaseDescriptor
conf.accord_transactions_enabled = b;
}
+ public static int getAccordShardCount()
+ {
+ return
conf.accord_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
+ }
+
public static boolean getForceNewPreparedStatementBehaviour()
{
return conf.force_new_prepared_statement_behaviour;
diff --git a/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java
b/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java
new file mode 100644
index 0000000000..ea33b7af98
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.Objects;
+import java.util.function.IntSupplier;
+
+public class OptionaldPositiveInt
+{
+ public static final int UNDEFINED_VALUE = -1;
+ public static final OptionaldPositiveInt UNDEFINED = new
OptionaldPositiveInt(UNDEFINED_VALUE);
+
+ private final int value;
+
+ public OptionaldPositiveInt(int value)
+ {
+ if (!(value == -1 || value >= 1))
+ throw new IllegalArgumentException(String.format("Only -1
(undefined) and positive values are allowed; given %d", value));
+ this.value = value;
+ }
+
+ public boolean isDefined()
+ {
+ return value != UNDEFINED_VALUE;
+ }
+
+ public int or(int defaultValue)
+ {
+ return value == UNDEFINED_VALUE ? defaultValue : value;
+ }
+
+ public int or(IntSupplier defaultValue)
+ {
+ return value == UNDEFINED_VALUE ? defaultValue.getAsInt() : value;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OptionaldPositiveInt that = (OptionaldPositiveInt) o;
+ return value == that.value;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(value);
+ }
+
+ @Override
+ public String toString()
+ {
+ return !isDefined() ? "null" : Integer.toString(value);
+ }
+}
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 528accdb74..7606e86024 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -133,16 +133,7 @@ public class YamlConfigurationLoader implements
ConfigurationLoader
throw new AssertionError(e);
}
- SafeConstructor constructor = new CustomConstructor(Config.class,
Yaml.class.getClassLoader());
- Map<Class<?>, Map<String, Replacement>> replacements =
getNameReplacements(Config.class);
- verifyReplacements(replacements, configBytes);
- PropertiesChecker propertiesChecker = new
PropertiesChecker(replacements);
- constructor.setPropertyUtils(propertiesChecker);
- Yaml yaml = new Yaml(constructor);
- Config result = loadConfig(yaml, configBytes);
- propertiesChecker.check();
- maybeAddSystemProperties(result);
- return result;
+ return loadConfig(configBytes);
}
catch (YAMLException e)
{
@@ -150,6 +141,21 @@ public class YamlConfigurationLoader implements
ConfigurationLoader
}
}
+ @VisibleForTesting
+ static Config loadConfig(byte[] configBytes)
+ {
+ SafeConstructor constructor = new CustomConstructor(Config.class,
Yaml.class.getClassLoader());
+ Map<Class<?>, Map<String, Replacement>> replacements =
getNameReplacements(Config.class);
+ verifyReplacements(replacements, configBytes);
+ PropertiesChecker propertiesChecker = new
PropertiesChecker(replacements);
+ constructor.setPropertyUtils(propertiesChecker);
+ Yaml yaml = new Yaml(constructor);
+ Config result = loadConfig(yaml, configBytes);
+ propertiesChecker.check();
+ maybeAddSystemProperties(result);
+ return result;
+ }
+
private static void maybeAddSystemProperties(Object obj)
{
if
(CassandraRelevantProperties.CONFIG_ALLOW_SYSTEM_PROPERTIES.getBoolean())
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 15584b7e8f..781261e655 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -230,29 +230,29 @@ public enum Verb
// accord
ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE,
() -> EnumSerializer.simpleReply, RESPONSE_HANDLER
),
ACCORD_PREACCEPT_RSP (121, P2, writeTimeout, REQUEST_RESPONSE,
() -> PreacceptSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_PREACCEPT_REQ (120, P2, writeTimeout, ACCORD,
() -> PreacceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP ),
+ ACCORD_PREACCEPT_REQ (120, P2, writeTimeout, IMMEDIATE,
() -> PreacceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP ),
ACCORD_ACCEPT_RSP (124, P2, writeTimeout, REQUEST_RESPONSE,
() -> AcceptSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_ACCEPT_REQ (122, P2, writeTimeout, ACCORD,
() -> AcceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
- ACCORD_ACCEPT_INVALIDATE_REQ (123, P2, writeTimeout, ACCORD,
() -> AcceptSerializers.invalidate, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
+ ACCORD_ACCEPT_REQ (122, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
+ ACCORD_ACCEPT_INVALIDATE_REQ (123, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.invalidate, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
ACCORD_READ_RSP (128, P2, writeTimeout, REQUEST_RESPONSE,
() -> ReadDataSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_READ_REQ (127, P2, writeTimeout, ACCORD,
() -> ReadDataSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
- ACCORD_COMMIT_REQ (125, P2, writeTimeout, ACCORD,
() -> CommitSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
- ACCORD_COMMIT_INVALIDATE_REQ (126, P2, writeTimeout, ACCORD,
() -> CommitSerializers.invalidate, () ->
AccordService.instance().verbHandler() ),
+ ACCORD_READ_REQ (127, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
+ ACCORD_COMMIT_REQ (125, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_READ_RSP ),
+ ACCORD_COMMIT_INVALIDATE_REQ (126, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.invalidate, () ->
AccordService.instance().verbHandler() ),
ACCORD_APPLY_RSP (130, P2, writeTimeout, REQUEST_RESPONSE,
() -> ApplySerializers.reply, RESPONSE_HANDLER
),
- ACCORD_APPLY_REQ (129, P2, writeTimeout, ACCORD,
() -> ApplySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP ),
+ ACCORD_APPLY_REQ (129, P2, writeTimeout, IMMEDIATE,
() -> ApplySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP ),
ACCORD_RECOVER_RSP (132, P2, writeTimeout, REQUEST_RESPONSE,
() -> RecoverySerializers.reply, RESPONSE_HANDLER
),
- ACCORD_RECOVER_REQ (131, P2, writeTimeout, ACCORD,
() -> RecoverySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP ),
+ ACCORD_RECOVER_REQ (131, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP ),
ACCORD_BEGIN_INVALIDATE_RSP (134, P2, writeTimeout, REQUEST_RESPONSE,
() -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_BEGIN_INVALIDATE_REQ (133, P2, writeTimeout, ACCORD,
() -> BeginInvalidationSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP ),
+ ACCORD_BEGIN_INVALIDATE_REQ (133, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP ),
ACCORD_WAIT_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE,
() -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER
),
- ACCORD_WAIT_COMMIT_REQ (135, P2, writeTimeout, ACCORD,
() -> WaitOnCommitSerializer.request, () ->
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP ),
- ACCORD_INFORM_OF_TXNID_REQ (137, P2, writeTimeout, ACCORD,
() -> InformOfTxnIdSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
- ACCORD_INFORM_HOME_DURABLE_REQ (138, P2, writeTimeout, ACCORD,
() -> InformHomeDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
- ACCORD_INFORM_DURABLE_REQ (139, P2, writeTimeout, ACCORD,
() -> InformDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
+ ACCORD_WAIT_COMMIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> WaitOnCommitSerializer.request, () ->
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP ),
+ ACCORD_INFORM_OF_TXNID_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> InformOfTxnIdSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
+ ACCORD_INFORM_HOME_DURABLE_REQ (138, P2, writeTimeout, IMMEDIATE,
() -> InformHomeDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
+ ACCORD_INFORM_DURABLE_REQ (139, P2, writeTimeout, IMMEDIATE,
() -> InformDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, REQUEST_RESPONSE,
() -> CheckStatusSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_CHECK_STATUS_REQ (140, P2, writeTimeout, ACCORD,
() -> CheckStatusSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP ),
+ ACCORD_CHECK_STATUS_REQ (140, P2, writeTimeout, IMMEDIATE,
() -> CheckStatusSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP ),
ACCORD_GET_DEPS_RSP (143, P2, writeTimeout, REQUEST_RESPONSE,
() -> GetDepsSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_GET_DEPS_REQ (142, P2, writeTimeout, ACCORD,
() -> GetDepsSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP ),
+ ACCORD_GET_DEPS_REQ (142, P2, writeTimeout, IMMEDIATE,
() -> GetDepsSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP ),
// generic failure response
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCallback.java
b/src/java/org/apache/cassandra/service/accord/AccordCallback.java
index 60b5d6988a..20ed9fad69 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCallback.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCallback.java
@@ -22,28 +22,29 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.coordinate.Timeout;
+import accord.local.AgentExecutor;
import accord.messages.Callback;
+import accord.messages.SafeCallback;
import accord.messages.Reply;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.RequestCallback;
-class AccordCallback<T extends Reply> implements RequestCallback<T>
+class AccordCallback<T extends Reply> extends SafeCallback<T> implements
RequestCallback<T>
{
private static final Logger logger =
LoggerFactory.getLogger(AccordCallback.class);
- private final Callback<T> callback;
- public AccordCallback(Callback<T> callback)
+ public AccordCallback(AgentExecutor executor, Callback<T> callback)
{
- this.callback = callback;
+ super(executor, callback);
}
@Override
public void onResponse(Message<T> msg)
{
logger.debug("Received response {} from {}", msg.payload, msg.from());
- callback.onSuccess(EndpointMapping.endpointToId(msg.from()),
msg.payload);
+ success(EndpointMapping.endpointToId(msg.from()), msg.payload);
}
private static Throwable convertReason(RequestFailureReason reason)
@@ -56,9 +57,15 @@ class AccordCallback<T extends Reply> implements
RequestCallback<T>
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason
failureReason)
{
- logger.debug("Received failure {} from {} for {}", failureReason,
from, callback);
+ logger.debug("Received failure {} from {} for {}", failureReason,
from, this);
// TODO (now): we should distinguish timeout failures with some
placeholder Exception
- callback.onFailure(EndpointMapping.endpointToId(from),
convertReason(failureReason));
+ failure(EndpointMapping.endpointToId(from),
convertReason(failureReason));
+ }
+
+ @Override
+ public boolean trackLatencyForSnitch()
+ {
+ return true;
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 70962298f4..1633dc7930 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -50,7 +50,7 @@ import
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-public class AccordCommandStore implements CommandStore
+public class AccordCommandStore extends CommandStore
{
private static long getThreadId(ExecutorService executor)
{
@@ -68,7 +68,6 @@ public class AccordCommandStore implements CommandStore
}
}
- private final int id;
private final long threadId;
public final String loggingId;
private final ExecutorService executor;
@@ -79,12 +78,6 @@ public class AccordCommandStore implements CommandStore
private AccordSafeCommandStore current = null;
private long lastSystemTimestampMicros = Long.MIN_VALUE;
- private final NodeTimeService time;
- private final Agent agent;
- private final DataStore dataStore;
- private final ProgressLog progressLog;
- private final RangesForEpochHolder rangesForEpochHolder;
-
public AccordCommandStore(int id,
NodeTimeService time,
Agent agent,
@@ -92,24 +85,20 @@ public class AccordCommandStore implements CommandStore
ProgressLog.Factory progressLogFactory,
RangesForEpochHolder rangesForEpoch)
{
- this.id = id;
- this.time = time;
- this.agent = agent;
- this.dataStore = dataStore;
- this.progressLog = progressLogFactory.create(this);
- this.rangesForEpochHolder = rangesForEpoch;
+ super(id, time, agent, dataStore, progressLogFactory, rangesForEpoch);
this.loggingId = String.format("[%s]", id);
this.executor =
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id +
']');
this.threadId = getThreadId(this.executor);
this.stateCache = new AccordStateCache(8<<20);
this.commandCache = stateCache.instance(TxnId.class,
accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
this.commandsForKeyCache = stateCache.instance(RoutableKey.class,
CommandsForKey.class, AccordSafeCommandsForKey::new,
AccordObjectSizes::commandsForKey);
+ executor.execute(() -> CommandStore.register(this));
}
@Override
- public int id()
+ public boolean inStore()
{
- return id;
+ return Thread.currentThread().getId() == threadId;
}
public void setCacheSize(long bytes)
@@ -125,12 +114,12 @@ public class AccordCommandStore implements CommandStore
public void checkInStoreThread()
{
- Invariants.checkState(Thread.currentThread().getId() == threadId);
+ Invariants.checkState(inStore());
}
public void checkNotInStoreThread()
{
- Invariants.checkState(Thread.currentThread().getId() != threadId);
+ Invariants.checkState(!inStore());
}
public ExecutorService executor()
@@ -197,13 +186,7 @@ public class AccordCommandStore implements CommandStore
public DataStore dataStore()
{
- return dataStore;
- }
-
- @Override
- public Agent agent()
- {
- return agent;
+ return store;
}
NodeTimeService time()
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 0708f092d2..45208e5c34 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -25,14 +25,15 @@ import accord.local.CommandStores;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
import accord.topology.Topology;
+import accord.utils.RandomSource;
public class AccordCommandStores extends CommandStores<AccordCommandStore>
{
private long cacheSize;
- AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
+ AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
RandomSource random,
ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
{
- super(time, agent, store, shardDistributor, progressLogFactory,
AccordCommandStore::new);
+ super(time, agent, store, random, shardDistributor,
progressLogFactory, AccordCommandStore::new);
setCacheSize(maxCacheSize());
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index c7f1591e92..ff83f70756 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -20,14 +20,15 @@ package org.apache.cassandra.service.accord;
import java.util.EnumMap;
import java.util.Map;
-import java.util.Objects;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Agent;
import accord.api.MessageSink;
+import accord.local.AgentExecutor;
import accord.local.Node;
import accord.messages.Callback;
import accord.messages.MessageType;
@@ -93,23 +94,25 @@ public class AccordMessageSink implements MessageSink
return VerbMapping.instance.mapping.get(type);
}
+ private final Agent agent;
private final Messaging messaging;
- public AccordMessageSink(Messaging messaging)
+ public AccordMessageSink(Agent agent, Messaging messaging)
{
+ this.agent = agent;
this.messaging = messaging;
}
- public AccordMessageSink()
+ public AccordMessageSink(Agent agent)
{
- this(MessagingService.instance());
+ this(agent, MessagingService.instance());
}
@Override
public void send(Node.Id to, Request request)
{
Verb verb = getVerb(request.type());
- Objects.requireNonNull(verb, "verb");
+ Preconditions.checkNotNull(verb, "Verb is null for type %s",
request.type());
Message<Request> message = Message.out(verb, request);
InetAddressAndPort endpoint = getEndpoint(to);
logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
@@ -117,14 +120,14 @@ public class AccordMessageSink implements MessageSink
}
@Override
- public void send(Node.Id to, Request request, Callback callback)
+ public void send(Node.Id to, Request request, AgentExecutor executor,
Callback callback)
{
Verb verb = getVerb(request.type());
- Preconditions.checkArgument(verb != null);
+ Preconditions.checkNotNull(verb, "Verb is null for type %s",
request.type());
Message<Request> message = Message.out(verb, request);
InetAddressAndPort endpoint = getEndpoint(to);
logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
- messaging.sendWithCallback(message, endpoint, new
AccordCallback<>((Callback<Reply>) callback));
+ messaging.sendWithCallback(message, endpoint, new
AccordCallback<>(executor, (Callback<Reply>) callback));
}
@Override
@@ -132,7 +135,9 @@ public class AccordMessageSink implements MessageSink
{
Message<?> replyTo = (Message<?>) replyContext;
Message<?> replyMsg = replyTo.responseWith(reply);
- Preconditions.checkArgument(replyMsg.verb() == getVerb(reply.type()));
+ Verb verb = getVerb(reply.type());
+ Preconditions.checkNotNull(verb, "Verb is null for type %s",
reply.type());
+ Preconditions.checkArgument(replyMsg.verb() == verb, "Expected reply
message with verb %s but got %s; reply type was %s", replyMsg.verb(), verb,
reply.type());
InetAddressAndPort endpoint = getEndpoint(replyingToNode);
logger.debug("Replying {} {} to {}", replyMsg.verb(),
replyMsg.payload, endpoint);
messaging.send(replyMsg, endpoint);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index a86cb70c53..d8b9f4d89b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -134,7 +133,8 @@ public class AccordService implements IAccordService,
Shutdownable
{
Node.Id localId =
EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort());
logger.info("Starting accord with nodeId {}", localId);
- this.messageSink = new AccordMessageSink();
+ AccordAgent agent = new AccordAgent();
+ this.messageSink = new AccordMessageSink(agent);
this.configService = new AccordConfigurationService(localId);
this.scheduler = new AccordScheduler();
this.node = new Node(localId,
@@ -142,8 +142,8 @@ public class AccordService implements IAccordService,
Shutdownable
configService,
AccordService::uniqueNow,
() -> null,
- new KeyspaceSplitter(new
EvenSplit<>(getConcurrentAccordOps(), getPartitioner().accordSplitter())),
- new AccordAgent(),
+ new KeyspaceSplitter(new
EvenSplit<>(DatabaseDescriptor.getAccordShardCount(),
getPartitioner().accordSplitter())),
+ agent,
new DefaultRandom(),
scheduler,
SizeOfIntersectionSorter.SUPPLIER,
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 2f2dd2ce6d..ce5bb125f1 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -302,7 +302,7 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
{
Invariants.checkArgument(this.callback == null);
this.callback = callback;
- commandStore.executor().submit(this);
+ commandStore.executor().execute(this);
}
private static Iterable<RoutableKey> toRoutableKeys(Seekables<?, ?> keys)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 4899316cc5..a344028b49 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -81,7 +81,7 @@ public class ReadDataSerializers
out.writeByte(0);
ReadOk readOk = (ReadOk) reply;
- TxnData.serializer.serialize((TxnData) readOk.data, out, version);
+ TxnData.nullableSerializer.serialize((TxnData) readOk.data, out,
version);
}
@Override
@@ -91,7 +91,7 @@ public class ReadDataSerializers
if (id != 0)
return nacks[id - 1];
- return new ReadOk(TxnData.serializer.deserialize(in, version));
+ return new ReadOk(TxnData.nullableSerializer.deserialize(in,
version));
}
@Override
@@ -101,7 +101,7 @@ public class ReadDataSerializers
return TypeSizes.BYTE_SIZE;
ReadOk readOk = (ReadOk) reply;
- return TypeSizes.BYTE_SIZE +
TxnData.serializer.serializedSize((TxnData) readOk.data, version);
+ return TypeSizes.BYTE_SIZE +
TxnData.nullableSerializer.serializedSize((TxnData) readOk.data, version);
}
};
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index a095d8ba78..c3d8f6e18d 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NullableSerializer;
import org.apache.cassandra.utils.ObjectSizes;
public class TxnData implements Data, Result, Iterable<FilteredPartition>
@@ -196,4 +197,6 @@ public class TxnData implements Data, Result,
Iterable<FilteredPartition>
return size;
}
};
+
+ public static final IVersionedSerializer<TxnData> nullableSerializer =
NullableSerializer.wrap(serializer);
}
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index c760659405..714c2c66f4 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -134,19 +134,14 @@ public class FBUtilities
private static volatile String previousReleaseVersionString;
- private static int availableProcessors =
Integer.getInteger("cassandra.available_processors",
DatabaseDescriptor.getAvailableProcessors());
-
public static void setAvailableProcessors(int value)
{
- availableProcessors = value;
+ DatabaseDescriptor.setAvailableProcessors(value);
}
public static int getAvailableProcessors()
{
- if (availableProcessors > 0)
- return availableProcessors;
- else
- return Runtime.getRuntime().availableProcessors();
+ return DatabaseDescriptor.getAvailableProcessors();
}
public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index e8809c606c..ac3a773453 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -87,7 +87,6 @@ import org.apache.cassandra.simulator.utils.KindOfSequence;
import org.apache.cassandra.simulator.utils.LongRange;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Closeable;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.memory.BufferPool;
@@ -544,7 +543,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
.set("concurrent_counter_writes", take(1, 4))
.set("concurrent_materialized_view_writes", take(1, 4))
.set("concurrent_reads", take(1, 4))
- .forceSet("available_processors", take(3, 4));
+ .set("available_processors", take(3, 4));
}
// begin allocating for a new node
@@ -591,7 +590,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
if (remaining * min <= allocationPool)
return min;
if (times == remaining)
- return allocationPool / remaining;
+ return Math.max(allocationPool / remaining, min);
if (times + 1 == remaining)
return random.uniform(Math.max(min, (allocationPool - max) /
times), Math.min(max, (allocationPool - min) / times));
@@ -602,7 +601,6 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
}
}
-
public final RandomSource random;
public final SimulatedSystems simulated;
public final Cluster cluster;
@@ -720,7 +718,6 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
@Override
public void beforeStartup(IInstance i)
{
- ((IInvokableInstance)
i).unsafeAcceptOnThisThread(FBUtilities::setAvailableProcessors,
i.config().getInt("available_processors"));
((IInvokableInstance)
i).unsafeAcceptOnThisThread(IfInterceptibleThread::setThreadLocalRandomCheck,
(LongConsumer) threadLocalRandomCheck);
int num = i.config().num();
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
index 5a528468ea..844a8df368 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
@@ -209,6 +209,8 @@ abstract class AbstractPairOfSequencesPaxosSimulation
extends PaxosSimulation
public Action get()
{
int[] primaryKeyIndex = consume(simulated.random, available);
+ if (primaryKeyIndex == null)
+ return Actions.empty("All primary keys are taken, try
again later");
long untilNanos = simulated.time.nanoTime() +
SECONDS.toNanos(simulateKeyForSeconds.select(simulated.random));
int concurrency =
withinKeyConcurrency.select(simulated.random);
Supplier<Action> supplier = factory.apply(simulated,
primaryKeyIndex);
@@ -249,7 +251,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation
extends PaxosSimulation
private int[] consume(RandomSource random, List<Integer> available)
{
if (available.isEmpty())
- throw new AssertionError("available partitions are empty!");
+ return null;
int numPartitions = available.size() == 1 ||
!allowMultiplePartitions() ? 1 : random.uniform(1, available.size());
int[] partitions = new int[numPartitions];
for (int counter = 0; counter < numPartitions; counter++)
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 2b600f0062..e02cd848d5 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.simulator.paxos;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -286,19 +288,39 @@ public abstract class PaxosSimulation implements
Simulation, ClusterActionListen
private RuntimeException logAndThrow()
{
- Integer causedByPrimaryKey = null;
- Throwable causedByThrowable = null;
+ class Violation
+ {
+ final int primaryKey;
+ final Throwable cause;
+
+ Violation(int primaryKey, Throwable cause)
+ {
+ this.primaryKey = primaryKey;
+ this.cause = cause;
+ }
+ }
+ List<Violation> violations = new ArrayList<>();
for (Throwable t : simulated.failures.get())
{
+ Integer causedByPrimaryKey;
if (null != (causedByPrimaryKey = causedBy(t)))
{
- causedByThrowable = t;
+ violations.add(new Violation(causedByPrimaryKey, t));
break;
}
}
- log(causedByPrimaryKey);
- Throwable t = (causedByPrimaryKey != null) ? causedByThrowable :
simulated.failures.get().get(0);
+ if (!violations.isEmpty())
+ {
+ AssertionError error = new AssertionError("History violations
detected");
+ violations.forEach(v -> {
+ log(v.primaryKey);
+ error.addSuppressed(v.cause);
+ });
+ throw error;
+ }
+
+ Throwable t = simulated.failures.get().get(0);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
}
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
index aa0a400b54..9fd86547b0 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
@@ -382,8 +382,11 @@ public abstract class SimulatedAction extends Action
implements InterceptorOfCon
notify = from;
}
boolean isTimeout = deliver != FAILURE;
+ Executor notifierExecutor = notify.executorFor(verb.id);
+ if (notifierExecutor instanceof ImmediateExecutor)
+ notifierExecutor = notify.executor();
InterceptedExecution.InterceptedTaskExecution failTask = new
InterceptedRunnableExecution(
- (InterceptingExecutor) notify.executorFor(verb.id),
+ (InterceptingExecutor) notifierExecutor,
() -> notify.unsafeApplyOnThisThread((socketAddress, id,
innerIsTimeout) -> {
InetAddressAndPort address =
InetAddressAndPort.getByAddress(socketAddress);
RequestCallbacks.CallbackInfo callback =
instance().callbacks.remove(id, address);
diff --git
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 50eee33393..80522706d4 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -137,6 +137,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.GuardrailsOptions$ConsistencyLevels",
"org.apache.cassandra.config.GuardrailsOptions$TableProperties",
"org.apache.cassandra.config.ParameterizedClass",
+ "org.apache.cassandra.config.OptionaldPositiveInt",
"org.apache.cassandra.config.ReplicaFilteringProtectionOptions",
"org.apache.cassandra.config.StartupChecksOptions",
"org.apache.cassandra.config.SubnetGroups",
diff --git
a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index 06be1dc209..a48eb82ed0 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -31,6 +31,9 @@ import java.util.function.Predicate;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.io.util.File;
import org.yaml.snakeyaml.error.YAMLException;
@@ -359,6 +362,36 @@ public class YamlConfigurationLoaderTest
assertThat(from("sstable_preemptive_open_interval_in_mb",
-2).sstable_preemptive_open_interval).isNull();
}
+ @Test
+ public void process()
+ {
+ for (Type type : Type.values())
+ {
+ Config c = fromType(type, "available_processors", 4);
+ assertThat(c.available_processors).isEqualTo(new
OptionaldPositiveInt(4));
+
assertThat(c.accord_shard_count).isEqualTo(OptionaldPositiveInt.UNDEFINED);
+
+ c = fromType(type, "available_processors", 3,
"accord_shard_count", 1);
+ assertThat(c.available_processors).isEqualTo(new
OptionaldPositiveInt(3));
+ assertThat(c.accord_shard_count).isEqualTo(new
OptionaldPositiveInt(1));
+ }
+ }
+
+ private enum Type { MAP, YAML }
+
+ private static Config fromType(Type type, Object... values)
+ {
+ switch (type)
+ {
+ case MAP:
+ return from(values);
+ case YAML:
+ return fromYaml(values);
+ default:
+ throw new AssertionError("Unexpected type: " + type);
+ }
+ }
+
private static Config from(Object... values)
{
assert values.length % 2 == 0 : "Map can only be created with an even
number of inputs: given " + values.length;
@@ -368,6 +401,24 @@ public class YamlConfigurationLoaderTest
return YamlConfigurationLoader.fromMap(builder.build(), Config.class);
}
+ private static Config fromYaml(Object... values)
+ {
+ assert values.length % 2 == 0 : "Map can only be created with an even
number of inputs: given " + values.length;
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+ for (int i = 0; i < values.length; i += 2)
+ builder.put((String) values[i], values[i + 1]);
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ try
+ {
+ byte[] bytes = mapper.writeValueAsBytes(builder.build());
+ return YamlConfigurationLoader.loadConfig(bytes);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new AssertionError("Unable to convert map to YAML", e);
+ }
+ }
+
private static Config load(String path)
{
URL url =
YamlConfigurationLoaderTest.class.getClassLoader().getResource(path);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index 82394eaed0..a1657bb38f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
import org.junit.BeforeClass;
import org.junit.Test;
+import accord.api.Agent;
import accord.local.Node;
import accord.messages.InformOfTxnId;
import accord.messages.SimpleReply;
@@ -48,7 +49,7 @@ public class AccordMessageSinkTest
SimpleReply reply = SimpleReply.Ok;
Messaging messaging = Mockito.mock(Messaging.class);
- AccordMessageSink sink = new AccordMessageSink(messaging);
+ AccordMessageSink sink = new
AccordMessageSink(Mockito.mock(Agent.class), messaging);
sink.reply(new Node.Id(1), req, reply);
Mockito.verify(messaging).send(Mockito.any(), Mockito.any());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]