This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4034fbb6dd4b9026b5227458a63c82e7e8075459 Merge: d13b3ef61b c6d7d070c5 Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Thu Aug 24 16:32:51 2023 -0600 Merge branch 'cassandra-3.11' into cassandra-4.0 build.xml | 5 +- .../utils/RMIClientSocketFactoryImpl.java | 22 +++++- .../apache/cassandra/utils/ReflectionUtils.java | 46 +++++++++++- .../cassandra/distributed/impl/Instance.java | 5 +- .../cassandra/distributed/impl/IsolatedJmx.java | 87 +++++++++------------- .../cassandra/distributed/shared/ClusterUtils.java | 34 +++++++-- .../distributed/test/ResourceLeakTest.java | 54 ++++++++------ .../distributed/test/jmx/JMXFeatureTest.java | 85 ++++++++++++++------- .../distributed/test/jmx/JMXGetterCheckTest.java | 46 +++++++++--- 9 files changed, 262 insertions(+), 122 deletions(-) diff --cc build.xml index 0b914175bb,70a195f98d..5812360e27 --- a/build.xml +++ b/build.xml @@@ -176,111 -160,6 +176,112 @@@ } </script> + <condition property="java.version.8"> + <equals arg1="${ant.java.version}" arg2="1.8"/> + </condition> + <condition property="java.version.11"> + <not><isset property="java.version.8"/></not> + </condition> + <fail><condition><not><or> + <isset property="java.version.8"/> + <isset property="java.version.11"/> + </or></not></condition></fail> + + <resources id="_jvm11_arg_items"> + <string>-Djdk.attach.allowAttachSelf=true</string> + + <string>-XX:+UseConcMarkSweepGC</string> + <string>-XX:+CMSParallelRemarkEnabled</string> + <string>-XX:SurvivorRatio=8</string> + <string>-XX:MaxTenuringThreshold=1</string> + <string>-XX:CMSInitiatingOccupancyFraction=75</string> + <string>-XX:+UseCMSInitiatingOccupancyOnly</string> + <string>-XX:CMSWaitDuration=10000</string> + <string>-XX:+CMSParallelInitialMarkEnabled</string> + <string>-XX:+CMSEdenChunksRecordAlways</string> + + <string>--add-exports java.base/jdk.internal.misc=ALL-UNNAMED</string> + <string>--add-exports java.base/jdk.internal.ref=ALL-UNNAMED</string> + <string>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</string> + <string>--add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED</string> + <string>--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED</string> + <string>--add-exports java.rmi/sun.rmi.server=ALL-UNNAMED</string> ++ <string>--add-exports java.rmi/sun.rmi.transport=ALL-UNNAMED</string> + <string>--add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string> + <string>--add-exports java.sql/java.sql=ALL-UNNAMED</string> + + <string>--add-opens java.base/java.lang.module=ALL-UNNAMED</string> + <string>--add-opens java.base/java.net=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.loader=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.ref=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.reflect=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.math=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.module=ALL-UNNAMED</string> + <string>--add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED</string> + <string>--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED</string> + + </resources> + <pathconvert property="_jvm_args_concat" refid="_jvm11_arg_items" pathsep=" "/> + <condition property="java11-jvmargs" value="${_jvm_args_concat}" else=""> + <not> + <equals arg1="${ant.java.version}" arg2="1.8"/> + </not> + </condition> + + <!-- + JVM arguments for tests. + + There is a race condition bug in java 11 (see CASSANDRA-15981) which causes a crash of the + JVM; this race is between CMS and class unloading. In java 8 we can cap the metaspace to + make tests stable on low resource environments, but in java 11 we need to make it unlimited + (don't define MaxMetaspaceSize) and disable class unloading in CMS outside of a + stop-the-world pause. + + In java 11 we also need to set a system property to enable netty to use Unsafe direct byte + buffer construction (see CASSANDRA-16493) + --> + <resources id="_jvm8_test_arg_items"> + <!-- TODO see CASSANDRA-16212 - we seem to OOM non stop now after CASSANDRA-16212, so to have clean CI while this gets looked into, disabling limiting metaspace + <string>-XX:MaxMetaspaceExpansion=64M</string> + <string>-XX:MaxMetaspaceSize=512M</string> + <string>-XX:MetaspaceSize=128M</string> + --> + </resources> + <pathconvert property="_jvm8_test_arg_items_concat" refid="_jvm8_test_arg_items" pathsep=" "/> + <resources id="_jvm11_test_arg_items"> + <string>-XX:-CMSClassUnloadingEnabled</string> + <string>-Dio.netty.tryReflectionSetAccessible=true</string> + </resources> + <pathconvert property="_jvm11_test_arg_items_concat" refid="_jvm11_test_arg_items" pathsep=" "/> + <condition property="test-jvmargs" value="${_jvm11_test_arg_items_concat}" else="${_jvm8_test_arg_items_concat}"> + <not> + <equals arg1="${ant.java.version}" arg2="1.8"/> + </not> + </condition> + + <!-- needed to compile org.apache.cassandra.utils.JMXServerUtils --> + <condition property="jdk11-javac-exports" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" else=""> + <not> + <equals arg1="${ant.java.version}" arg2="1.8"/> + </not> + </condition> + <condition property="jdk11-javadoc-exports" value="${jdk11-javac-exports} --frames" else=""> + <not> + <equals arg1="${ant.java.version}" arg2="1.8"/> + </not> + </condition> + + <condition property="build.java.11"> + <istrue value="${use.jdk11}"/> + </condition> + + <condition property="source.version" value="8" else="11"> + <equals arg1="${java.version.8}" arg2="true"/> + </condition> + <condition property="target.version" value="8" else="11"> + <equals arg1="${java.version.8}" arg2="true"/> + </condition> + <!-- Add all the dependencies. --> @@@ -552,35 -383,30 +553,35 @@@ <exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/> </dependency> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" scope="test"/> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.26" scope="test"/> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" scope="test"> + <exclusion groupId="com.google.guava" artifactId="guava"/> + </dependency> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.15" scope="test"/> + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.16" scope="test"/> <dependency groupId="org.reflections" artifactId="reflections" version="0.10.2" scope="test"/> - <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" scope="test"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided"> - <exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/> - <exclusion groupId="commons-logging" artifactId="commons-logging"/> - <exclusion groupId="org.eclipse.jdt" artifactId="core"/> - <exclusion groupId="ant" artifactId="ant"/> - <exclusion groupId="junit" artifactId="junit"/> + <exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/> + <exclusion groupId="commons-logging" artifactId="commons-logging"/> + <exclusion groupId="org.eclipse.jdt" artifactId="core"/> + <exclusion groupId="ant" artifactId="ant"/> + <exclusion groupId="junit" artifactId="junit"/> <exclusion groupId="org.codehaus.jackson" artifactId="jackson-mapper-asl"/> + <exclusion groupId="org.slf4j" artifactId="slf4j-api"/> </dependency> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3" scope="provided"> - <exclusion groupId="asm" artifactId="asm"/> <!-- this is the outdated version 3.1 --> + <exclusion groupId="asm" artifactId="asm"/> <!-- this is the outdated version 3.1 --> + <exclusion groupId="org.slf4j" artifactId="slf4j-api"/> <exclusion groupId="org.codehaus.jackson" artifactId="jackson-mapper-asl"/> </dependency> - <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.2.2"/> + <dependency groupId="net.java.dev.jna" artifactId="jna" version="5.6.0"/> - <dependency groupId="org.jacoco" artifactId="org.jacoco.agent" version="${jacoco.version}"/> - <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" version="${jacoco.version}"/> + <dependency groupId="org.jacoco" artifactId="org.jacoco.agent" version="${jacoco.version}" scope="test"/> + <dependency groupId="org.jacoco" artifactId="org.jacoco.ant" version="${jacoco.version}" scope="test"/> - <dependency groupId="org.jboss.byteman" artifactId="byteman-install" version="${byteman.version}"/> - <dependency groupId="org.jboss.byteman" artifactId="byteman" version="${byteman.version}"/> - <dependency groupId="org.jboss.byteman" artifactId="byteman-submit" version="${byteman.version}"/> - <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" version="${byteman.version}"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-install" version="${byteman.version}" scope="provided"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman" version="${byteman.version}" scope="provided"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-submit" version="${byteman.version}" scope="provided"/> + <dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit" version="${byteman.version}" scope="provided"/> <dependency groupId="net.bytebuddy" artifactId="byte-buddy" version="${bytebuddy.version}" /> <dependency groupId="net.bytebuddy" artifactId="byte-buddy-agent" version="${bytebuddy.version}" /> @@@ -2003,32 -1752,13 +2004,32 @@@ </java> </target> + <target name="_maybe_update_idea_to_java11" if="java.version.11"> + <replace file="${eclipse.project.name}.iml" token="JDK_1_8" value="JDK_11"/> + <replace file=".idea/misc.xml" token="JDK_1_8" value="JDK_11"/> + <replace file=".idea/misc.xml" token="1.8" value="11"/> + <replaceregexp file=".idea/workspace.xml" + match="name="VM_PARAMETERS" value="(.*)" + replace="name="VM_PARAMETERS" value="\1 ${java11-jvmargs}" + byline="true"/> + + <echo file=".idea/compiler.xml"><![CDATA[<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="JavacSettings"> - <option name="ADDITIONAL_OPTIONS_STRING" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" /> ++ <option name="ADDITIONAL_OPTIONS_STRING" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport=ALL-UNNAMED" /> + </component> +</project>]]></echo> + </target> + <!-- Generate IDEA project description files --> - <target name="generate-idea-files" depends="init,maven-ant-tasks-retrieve-build,gen-cql3-grammar,createVersionPropFile" description="Generate IDEA files"> + <target name="generate-idea-files" depends="init,maven-ant-tasks-init,resolver-dist-lib,gen-cql3-grammar,generate-jflex-java,createVersionPropFile" description="Generate IDEA files"> <mkdir dir=".idea"/> <mkdir dir=".idea/libraries"/> - <copy todir=".idea"> + <copy todir=".idea" overwrite="true"> <fileset dir="ide/idea"/> </copy> + <replace file=".idea/workspace.xml" token="trunk" value="${eclipse.project.name}"/> + <replace file=".idea/workspace.xml" token="-Dcassandra.use_nix_recursive_delete=true" value="-Dcassandra.use_nix_recursive_delete=${cassandra.use_nix_recursive_delete}"/> <copy tofile="${eclipse.project.name}.iml" file="ide/idea-iml-file.xml"/> <echo file=".idea/.name">Apache Cassandra ${eclipse.project.name}</echo> <echo file=".idea/modules.xml"><![CDATA[<?xml version="1.0" encoding="UTF-8"?> diff --cc src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java index 62ab88fb92,c04e934fdc..81e728eeb9 --- a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java +++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java @@@ -21,8 -21,11 +21,10 @@@ package org.apache.cassandra.utils import java.io.IOException; import java.io.Serializable; import java.net.InetAddress; -import java.net.ServerSocket; import java.net.Socket; import java.rmi.server.RMIClientSocketFactory; + import java.util.ArrayList; + import java.util.List; import java.util.Objects; /** diff --cc src/java/org/apache/cassandra/utils/ReflectionUtils.java index bd605db3bc,3d1cc29565..e2280e66b0 --- a/src/java/org/apache/cassandra/utils/ReflectionUtils.java +++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java @@@ -52,4 -56,46 +56,44 @@@ public class ReflectionUtil throw e; } } + + /** + * Used by the in-jvm dtest framework to remove entries from private map fields that otherwise would prevent + * collection of classloaders (which causes metaspace OOMs) or otherwise interfere with instance restart. - * - * @param clazz The class which has the map field to clear - * @param instance an instance of the class to clear (pass null for a static member) - * @param mapName the name of the map field to clear ++ * @param clazz The class which has the map field to clear ++ * @param instance an instance of the class to clear (pass null for a static member) ++ * @param mapName the name of the map field to clear + * @param shouldRemove a predicate which determines if the entry in question should be removed - * @param <K> The type of the map key - * @param <V> The type of the map value ++ * @param <K> The type of the map key ++ * @param <V> The type of the map value + */ - public static <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName, Predicate<Map.Entry<K, V>> shouldRemove) - { ++ public static <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName, Predicate<Map.Entry<K, V>> shouldRemove) { + try + { + Field mapField = getField(clazz, mapName); + mapField.setAccessible(true); + // noinspection unchecked + Map<K, V> map = (Map<K, V>) mapField.get(instance); + // Because multiple instances can be shutting down at once, + // synchronize on the map to avoid ConcurrentModificationException + synchronized (map) + { + // This could be done with a simple `map.entrySet.removeIf()` call + // but for debugging purposes it is much easier to keep it like this. - Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); ++ Iterator<Map.Entry<K,V>> it = map.entrySet().iterator(); + while (it.hasNext()) + { - Map.Entry<K, V> entry = it.next(); ++ Map.Entry<K,V> entry = it.next(); + if (shouldRemove.test(entry)) + { + it.remove(); + } + } + } + } + catch (NoSuchFieldException | IllegalAccessException ex) + { + throw new RuntimeException(String.format("Could not clear map field %s in class %s", mapName, clazz), ex); + } + } } diff --cc test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java index e19e29fdb3,e586cae58d..2fd4ec9647 --- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java @@@ -18,38 -18,40 +18,40 @@@ package org.apache.cassandra.distributed.impl; - import java.lang.reflect.Field; + import java.io.IOException; import java.net.InetAddress; - import java.net.MalformedURLException; import java.util.HashMap; - import java.util.Iterator; + import java.util.LinkedList; import java.util.Map; + import java.util.concurrent.TimeUnit; + import javax.management.remote.JMXConnector; - import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXServiceURL; import javax.management.remote.rmi.RMIConnectorServer; import javax.management.remote.rmi.RMIJRMPServerImpl; ++import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; +import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; + import org.apache.cassandra.distributed.shared.JMXUtil; -import org.apache.cassandra.distributed.shared.Uninterruptibles; import org.apache.cassandra.utils.JMXServerUtils; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.RMIClientSocketFactoryImpl; - import org.apache.cassandra.utils.ReflectionUtils; import sun.rmi.transport.tcp.TCPEndpoint; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST; +import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION; +import static org.apache.cassandra.config.CassandraRelevantProperties.SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME; import static org.apache.cassandra.distributed.api.Feature.JMX; -import static org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION; + import static org.apache.cassandra.utils.ReflectionUtils.clearMapField; public class IsolatedJmx { - private static final int RMI_KEEPALIVE_TIME = 1000; - /** Controls the JMX server threadpool keap-alive time. */ - private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME = "sun.rmi.transport.tcp.threadKeepAliveTime"; - /** Controls the distributed garbage collector lease time for JMX objects. */ - private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST ="java.rmi.dgc.leaseValue"; + public static final int RMI_KEEPALIVE_TIME = 1000; + public static final String UNKNOWN_JMX_CONNECTION_ERROR = "Could not connect to JMX due to an unknown error"; private JMXConnectorServer jmxConnectorServer; private JMXServerUtils.JmxRegistry registry; @@@ -135,36 -139,21 +137,20 @@@ } } - private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException + private void waitForJmxAvailability(Map<String, ?> env) { - String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort); - JMXServiceURL serviceURL = new JMXServiceURL(url); - int attempts = 0; - Throwable lastThrown = null; - while (attempts < 20) - try (JMXConnector ignored = JMXUtil.getJmxConnector(config, 20, env)) - { ++ try (JMXConnector ignored = JMXUtil.getJmxConnector(config, 20, env)) { + // Do nothing - JMXUtil now retries + } + catch (IOException iex) { - attempts++; - try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env)) - { - inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)", - url, attempts); - return; - } - catch (MalformedURLException e) - { - throw new RuntimeException(e); - } - catch (Throwable thrown) - { - lastThrown = thrown; - } - inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts); - Thread.sleep(1000); + // If we land here, there's something more than a timeout + inInstancelogger.error(UNKNOWN_JMX_CONNECTION_ERROR, iex); + throw new RuntimeException(UNKNOWN_JMX_CONNECTION_ERROR, iex); } - throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown); } - public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException + public void stopJmx() { if (!config.has(JMX)) return; diff --cc test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index 2dfcf3f080,3e9798ea02..94256cca1b --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@@ -101,295 -71,12 +101,309 @@@ public class ClusterUtil Futures.getUnchecked(i.shutdown()); } + /** + * Stops an instance abruptly. This is done by blocking all messages to/from so all other instances are unable + * to communicate, then stopping the instance gracefully. + * + * The assumption is that hard stopping inbound and outbound messages will apear to the cluster as if the instance + * was stopped via kill -9; this does not hold true if the instance is restarted as it knows it was properly shutdown. + * + * @param cluster to filter messages to + * @param inst to shut down + */ + public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I inst) + { + // block all messages to/from the node going down to make sure a clean shutdown doesn't happen + IMessageFilters.Filter to = cluster.filters().allVerbs().to(inst.config().num()).drop(); + IMessageFilters.Filter from = cluster.filters().allVerbs().from(inst.config().num()).drop(); + try + { + stopUnchecked(inst); + } + finally + { + from.off(); + to.off(); + } + } + + /** + * Stop all the instances in the cluster. This function is differe than {@link ICluster#close()} as it doesn't + * clean up the cluster state, it only stops all the instances. + */ + public static <I extends IInstance> void stopAll(ICluster<I> cluster) + { + cluster.stream().forEach(ClusterUtils::stopUnchecked); + } + + /** + * Create a new instance and add it to the cluster, without starting it. + * + * @param cluster to add to + * @param dc the instance should be in + * @param rack the instance should be in + * @param <I> instance type + * @return the instance added + */ + public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster, + String dc, String rack) + { + return addInstance(cluster, dc, rack, ignore -> {}); + } + + /** + * Create a new instance and add it to the cluster, without starting it. + * + * @param cluster to add to + * @param dc the instance should be in + * @param rack the instance should be in + * @param fn function to add to the config before starting + * @param <I> instance type + * @return the instance added + */ + public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster, + String dc, String rack, + Consumer<IInstanceConfig> fn) + { + Objects.requireNonNull(dc, "dc"); + Objects.requireNonNull(rack, "rack"); + + InstanceConfig config = cluster.newInstanceConfig(); + //TODO adding new instances should be cleaner, currently requires you create the cluster with all + // instances known about (at least to NetworkTopology and TokenStategy) + // this is very hidden, so should be more explicit + config.networkTopology().put(config.broadcastAddress(), NetworkTopology.dcAndRack(dc, rack)); + + fn.accept(config); + + return cluster.bootstrap(config); + } + + /** + * Create and start a new instance that replaces an existing instance. + * + * The instance will be in the same datacenter and rack as the existing instance. + * + * @param cluster to add to + * @param toReplace instance to replace + * @param <I> instance type + * @return the instance added + */ + public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace) + { + return replaceHostAndStart(cluster, toReplace, ignore -> {}); + } + + /** + * Create and start a new instance that replaces an existing instance. + * + * The instance will be in the same datacenter and rack as the existing instance. + * + * @param cluster to add to + * @param toReplace instance to replace + * @param fn lambda to add additional properties + * @param <I> instance type + * @return the instance added + */ + public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, + IInstance toReplace, + Consumer<WithProperties> fn) + { + IInstanceConfig toReplaceConf = toReplace.config(); + I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true)); + + return start(inst, properties -> { + // lower this so the replacement waits less time + properties.setProperty("cassandra.broadcast_interval_ms", Long.toString(TimeUnit.SECONDS.toMillis(30))); + // default is 30s, lowering as it should be faster + properties.setProperty("cassandra.ring_delay_ms", Long.toString(TimeUnit.SECONDS.toMillis(10))); + properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, TimeUnit.SECONDS.toMillis(10)); + + // state which node to replace + properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress()); + + fn.accept(properties); + }); + } + + /** + * Calls {@link org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list of strings. + */ + public static List<String> getTokenMetadataTokens(IInvokableInstance inst) + { + return inst.callOnInstance(() -> + StorageService.instance.getTokenMetadata() + .sortedTokens().stream() + .map(Object::toString) + .collect(Collectors.toList())); + } + + public static String getLocalToken(IInvokableInstance inst) + { + return inst.callOnInstance(() -> { + List<String> tokens = new ArrayList<>(); + for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddressAndPort())) + tokens.add(t.getTokenValue().toString()); + + assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found"; + return tokens.get(0); + }); + } + + public static <I extends IInstance> void runAndWaitForLogs(Runnable r, String waitString, AbstractCluster<I> cluster) throws TimeoutException + { + runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new)); + } + + public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException + { + long [] marks = new long[instances.length]; + for (int i = 0; i < instances.length; i++) + marks[i] = instances[i].logs().mark(); + r.run(); + for (int i = 0; i < instances.length; i++) + instances[i].logs().watchFor(marks[i], waitString); + } + + + /** + * Get the ring from the perspective of the instance. + */ + public static List<RingInstanceDetails> ring(IInstance inst) + { + NodeToolResult results = inst.nodetoolResult("ring"); + results.asserts().success(); + return parseRing(results.getStdout()); + } + + /** + * Make sure the target instance is in the ring. + * + * @param instance instance to check on + * @param expectedInRing instance expected in the ring + * @return the ring (if target is present) + */ + public static List<RingInstanceDetails> assertInRing(IInstance instance, IInstance expectedInRing) + { + String targetAddress = getBroadcastAddressHostString(expectedInRing); + List<RingInstanceDetails> ring = ring(instance); + Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst(); + assertThat(match).as("Not expected to find %s but was found", targetAddress).isPresent(); + return ring; + } + + /** + * Make sure the target instance's gossip state matches on the source instance + * + * @param instance instance to check on + * @param expectedInRing instance expected in the ring + * @param state expected gossip state + * @return the ring (if target is present and has expected state) + */ + public static List<RingInstanceDetails> assertRingState(IInstance instance, IInstance expectedInRing, String state) + { + String targetAddress = getBroadcastAddressHostString(expectedInRing); + List<RingInstanceDetails> ring = ring(instance); + List<RingInstanceDetails> match = ring.stream() + .filter(d -> d.address.equals(targetAddress)) + .collect(Collectors.toList()); + assertThat(match) + .isNotEmpty() + .as("State was expected to be %s but was not", state) + .anyMatch(r -> r.state.equals(state)); + return ring; + } + + /** + * Make sure the target instance is NOT in the ring. + * + * @param instance instance to check on + * @param expectedInRing instance not expected in the ring + * @return the ring (if target is not present) + */ + public static List<RingInstanceDetails> assertNotInRing(IInstance instance, IInstance expectedInRing) + { + String targetAddress = getBroadcastAddressHostString(expectedInRing); + List<RingInstanceDetails> ring = ring(instance); + Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst(); + Assert.assertEquals("Not expected to find " + targetAddress + " but was found", Optional.empty(), match); + return ring; + } + + private static List<RingInstanceDetails> awaitRing(IInstance src, String errorMessage, Predicate<List<RingInstanceDetails>> fn) + { + List<RingInstanceDetails> ring = null; + for (int i = 0; i < 100; i++) + { + ring = ring(src); + if (fn.test(ring)) + { + return ring; + } + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + throw new AssertionError(errorMessage + "\n" + ring); + } + + /** + * Wait for the target to be in the ring as seen by the source instance. + * + * @param instance instance to check on + * @param expectedInRing instance to wait for + * @return the ring + */ + public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, IInstance expectedInRing) + { + return awaitRingJoin(instance, expectedInRing.broadcastAddress().getAddress().getHostAddress()); + } + + /** + * Wait for the target to be in the ring as seen by the source instance. + * + * @param instance instance to check on + * @param expectedInRing instance address to wait for + * @return the ring + */ + public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, String expectedInRing) + { + return awaitRing(instance, "Node " + expectedInRing + " did not join the ring...", ring -> { + Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(expectedInRing)).findFirst(); + if (match.isPresent()) + { + RingInstanceDetails details = match.get(); + return details.status.equals("Up") && details.state.equals("Normal"); + } + return false; + }); + } + + /** + * Wait for the ring to only have instances that are Up and Normal. + * + * @param src instance to check on + * @return the ring + */ + public static List<RingInstanceDetails> awaitRingHealthy(IInstance src) + { + return awaitRing(src, "Timeout waiting for ring to become healthy", + ring -> + ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy)); + } + ++ /** ++ * Wait for the ring to have the target instance with the provided status. ++ * ++ * @param instance instance to check on ++ * @param expectedInRing to look for ++ * @param status expected ++ * @return the ring ++ */ + public static List<RingInstanceDetails> awaitRingStatus(IInstance instance, IInstance expectedInRing, String status) + { + return awaitInstanceMatching(instance, expectedInRing, d -> d.status.equals(status), + "Timeout waiting for " + expectedInRing + " to have status " + status); + } + /** * Wait for the ring to have the target instance with the provided state. * @@@ -400,25 -87,35 +414,33 @@@ */ public static List<RingInstanceDetails> awaitRingState(IInstance instance, IInstance expectedInRing, String state) { - return awaitRing(instance, "Timeout waiting for " + expectedInRing + " to have state " + state, - ring -> - ring.stream() - .filter(d -> d.address.equals(getBroadcastAddressHostString(expectedInRing))) - .filter(d -> d.state.equals(state)) - .findAny().isPresent()); + return awaitInstanceMatching(instance, expectedInRing, d -> d.state.equals(state), + "Timeout waiting for " + expectedInRing + " to have state " + state); + } + + private static List<RingInstanceDetails> awaitInstanceMatching(IInstance instance, + IInstance expectedInRing, + Predicate<RingInstanceDetails> predicate, + String errorMessage) + { + return awaitRing(instance, + errorMessage, + ring -> ring.stream() + .filter(d -> d.address.equals(getBroadcastAddressHostString(expectedInRing))) + .anyMatch(predicate)); } - private static List<RingInstanceDetails> awaitRing(IInstance src, String errorMessage, Predicate<List<RingInstanceDetails>> fn) + /** + * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function + * only relies on the expectedInsts param. + * + * @param instance instance to check on + * @param expectedInRing expected instances in the ring + * @return the ring (if condition is true) + */ + public static List<RingInstanceDetails> assertRingIs(IInstance instance, IInstance... expectedInRing) { - List<RingInstanceDetails> ring = null; - for (int i = 0; i < 100; i++) - { - ring = ring(src); - if (fn.test(ring)) - { - return ring; - } - sleepUninterruptibly(1, TimeUnit.SECONDS); - } - throw new AssertionError(errorMessage + "\n" + ring); + return assertRingIs(instance, Arrays.asList(expectedInRing)); } /** diff --cc test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java index 83a35e594d,e227a24f8e..ea1efa8902 --- a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java @@@ -31,7 -30,8 +30,9 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy; + import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.distributed.test.TestBaseImpl; @@@ -45,22 -49,23 +50,33 @@@ public class JMXFeatureTest extends Tes * - Create a cluster with multiple JMX servers, one per instance * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port. - * NOTE: In later versions of Cassandra, there is also a `testOneNetworkInterfaceProvisioning` that leverages the ability to specify - * ports in addition to IP/Host for binding, but this version does not support that feature. Keeping the test name the same - * so that it's consistent across versions. * - * @throws Exception + * @throws Exception it's a test that calls JMX endpoints - lots of different Jmx exceptions are possible */ @Test public void testMultipleNetworkInterfacesProvisioning() throws Exception { -- int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times. ++ testJmxFeatures(INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces); ++ } ++ ++ @Test ++ public void testOneNetworkInterfaceProvisioning() throws Exception ++ { ++ testJmxFeatures(INodeProvisionStrategy.Strategy.OneNetworkInterface); ++ } ++ ++ private void testJmxFeatures(INodeProvisionStrategy.Strategy provisionStrategy) throws Exception ++ { Set<String> allInstances = new HashSet<>(); ++ int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times. for (int i = 0; i < iterations; i++) { - try (Cluster cluster = Cluster.build(2).withConfig(c -> c.with(Feature.values())).start()) + try (Cluster cluster = Cluster.build(2) - .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces) ++ .withNodeProvisionStrategy(provisionStrategy) + .withConfig(c -> c.with(Feature.values())).start()) { Set<String> instancesContacted = new HashSet<>(); - for (IInvokableInstance instance : cluster.get(1, 2)) + for (IInvokableInstance instance : cluster) { testInstance(instancesContacted, instance); } @@@ -72,33 -80,43 +91,43 @@@ } @Test - public void testOneNetworkInterfaceProvisioning() throws Exception + public void testShutDownAndRestartInstances() throws Exception { - int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times. - Set<String> allInstances = new HashSet<>(); - for (int i = 0; i < iterations; i++) + HashSet<String> instances = new HashSet<>(); + try (Cluster cluster = Cluster.build(2).withConfig(c -> c.with(Feature.values())).start()) { - try (Cluster cluster = Cluster.build(2) - .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.OneNetworkInterface) - .withConfig(c -> c.with(Feature.values())).start()) - { - Set<String> instancesContacted = new HashSet<>(); - for (IInvokableInstance instance : cluster.get(1, 2)) - { - testInstance(instancesContacted, instance); - } - Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size()); - allInstances.addAll(instancesContacted); - } + IInvokableInstance instanceToStop = cluster.get(1); + IInvokableInstance otherInstance = cluster.get(2); + testInstance(instances, cluster.get(1)); - testAllValidGetters(cluster); + ClusterUtils.stopUnchecked(instanceToStop); + // NOTE: This would previously fail because we cleared everything from the TCPEndpoint map in IsolatedJmx. + // Now, we only clear the endpoints related to that instance, which prevents this code from + // breaking with a `java.net.BindException: Address already in use (Bind failed)` + ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Down"); + NodeToolResult statusResult = cluster.get(2).nodetoolResult("status"); + Assert.assertEquals(0, statusResult.getRc()); + Assert.assertThat(statusResult.getStderr(), is(blankOrNullString())); + Assert.assertThat(statusResult.getStdout(), containsString("DN 127.0.0.1")); + testInstance(instances, cluster.get(2)); - ClusterUtils.start(instanceToStop, props -> {}); ++ ClusterUtils.start(instanceToStop, props -> { ++ }); + ClusterUtils.awaitRingState(otherInstance, instanceToStop, "Normal"); + ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Up"); + statusResult = cluster.get(1).nodetoolResult("status"); + Assert.assertEquals(0, statusResult.getRc()); + Assert.assertThat(statusResult.getStderr(), is(blankOrNullString())); + Assert.assertThat(statusResult.getStdout(), containsString("UN 127.0.0.1")); + statusResult = cluster.get(2).nodetoolResult("status"); + Assert.assertEquals(0, statusResult.getRc()); + Assert.assertThat(statusResult.getStderr(), is(blankOrNullString())); + Assert.assertThat(statusResult.getStdout(), containsString("UN 127.0.0.1")); + testInstance(instances, cluster.get(1)); + testAllValidGetters(cluster); } - Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size()); } - private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException + private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) { - // NOTE: At some point, the hostname of the broadcastAddress can be resolved - // and then the `getHostString`, which would otherwise return the IP address, - // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this. IInstanceConfig config = instance.config(); try (JMXConnector jmxc = JMXUtil.getJmxConnector(config)) { diff --cc test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java index ece6052a75,bbcfa52ebc..8221e039db --- a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java @@@ -43,17 -43,24 +43,22 @@@ import org.apache.cassandra.distributed public class JMXGetterCheckTest extends TestBaseImpl { private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of( - "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375 + "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost", // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375 - "org.apache.cassandra.db:type=StorageService:MaxNativeProtocolVersion" // Throws NPE ++ "org.apache.cassandra.db:type=DynamicEndpointSnitch:Scores" // when running in multiple-port-one-IP mode, this fails ++ ); private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of( "org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the instance, which then causes the JVM to exit "org.apache.cassandra.db:type=StorageService:drain", // don't drain, it stops things which can cause other APIs to be unstable as we are in a stopped state "org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop gossip this can cause other issues, so avoid - "org.apache.cassandra.db:type=StorageService:resetLocalSchema" // this will fail when there are no other nodes which can serve schema + "org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this will fail when there are no other nodes which can serve schema + "org.apache.cassandra.db:type=StorageService:joinRing", // Causes bootstrapping errors + "org.apache.cassandra.db:type=StorageService:clearConnectionHistory", // Throws a NullPointerException + "org.apache.cassandra.db:type=StorageService:startGossiping", // causes multiple loops to fail + "org.apache.cassandra.db:type=StorageService:startNativeTransport", // causes multiple loops to fail - "org.apache.cassandra.db:type=Tables,keyspace=system,table=local:loadNewSSTables", // Shouldn't attempt to load SSTables as sometimes the temp directories don't work - "org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", // hard-coded to throw UnsupportedOperationException - "org.apache.cassandra.db:type=StorageService:startRPCServer", // In looped tests this can sometimes fail - "org.apache.cassandra.db:type=StorageService:decommission" // shutting down the node and decommissioning it is bad - in later versions, it takes a parameter and wouldn't be run in this test ++ "org.apache.cassandra.db:type=Tables,keyspace=system,table=local:loadNewSSTables" // Shouldn't attempt to load SSTables as sometimes the temp directories don't work ); - public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi"; - @Test public void testGetters() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org