This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new ce6a65cb29 Clean up schema migration coordinator and tests ce6a65cb29 is described below commit ce6a65cb294af3b68ab2f4bba7945b097de42576 Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Fri Apr 8 10:30:38 2022 -0600 Clean up schema migration coordinator and tests patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17533 --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 4 +++ .../cassandra/schema/MigrationCoordinator.java | 34 ++++++++++++++++------ .../apache/cassandra/schema/MigrationManager.java | 12 -------- .../cassandra/distributed/action/GossipHelper.java | 3 +- .../cassandra/distributed/impl/Instance.java | 12 +++++--- .../distributed/test/MigrationCoordinatorTest.java | 12 ++++---- 7 files changed, 47 insertions(+), 31 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8fcebb0797..32fdd68552 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.4 + * Clean up schema migration coordinator and tests (CASSANDRA-17533) * Shut repair task executor down without interruption to avoid compromising shared channel proxies (CASSANDRA-17466) * Generate valid KEYSPACE / MATERIALIZED VIEW for CQL for views (CASSANDRA-17266) * Fix timestamp tz parsing (CASSANDRA-17467) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 0887e1dfd2..4afb1ee9e2 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -152,6 +152,10 @@ public enum CassandraRelevantProperties */ GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"), + IGNORED_SCHEMA_CHECK_VERSIONS("cassandra.skip_schema_check_for_versions"), + + IGNORED_SCHEMA_CHECK_ENDPOINTS("cassandra.skip_schema_check_for_endpoints"), + /** * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known * but the current state isn't known. If the host replacement is needed to repair this state, this property must diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java index 824b83f30a..bf3aee70fc 100644 --- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java +++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; @@ -61,6 +62,9 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.WaitQueue; +import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS; +import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS; + public class MigrationCoordinator { private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class); @@ -80,12 +84,9 @@ public class MigrationCoordinator public static final MigrationCoordinator instance = new MigrationCoordinator(); - public static final String IGNORED_VERSIONS_PROP = "cassandra.skip_schema_check_for_versions"; - public static final String IGNORED_ENDPOINTS_PROP = "cassandra.skip_schema_check_for_endpoints"; - private static ImmutableSet<UUID> getIgnoredVersions() { - String s = System.getProperty(IGNORED_VERSIONS_PROP); + String s = IGNORED_SCHEMA_CHECK_VERSIONS.getString(); if (s == null || s.isEmpty()) return ImmutableSet.of(); @@ -104,7 +105,7 @@ public class MigrationCoordinator { Set<InetAddressAndPort> endpoints = new HashSet<>(); - String s = System.getProperty(IGNORED_ENDPOINTS_PROP); + String s = IGNORED_SCHEMA_CHECK_ENDPOINTS.getString(); if (s == null || s.isEmpty()) return endpoints; @@ -430,13 +431,28 @@ public class MigrationCoordinator private static Future<?> submitToMigrationIfNotShutdown(Runnable task) { - if (Stage.MIGRATION.executor().isShutdown() || Stage.MIGRATION.executor().isTerminated()) + boolean skipped = false; + try + { + if (Stage.MIGRATION.executor().isShutdown() || Stage.MIGRATION.executor().isTerminated()) + { + skipped = true; + return null; + } + return Stage.MIGRATION.submit(task); + } + catch (RejectedExecutionException ex) { - logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown."); + skipped = true; return null; } - else - return Stage.MIGRATION.submit(task); + finally + { + if (skipped) + { + logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown."); + } + } } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index 9ebd33df9f..efe7c33532 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -50,18 +50,6 @@ public class MigrationManager public static final MigrationManager instance = new MigrationManager(); - private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime(); - - @VisibleForTesting - public static void setUptimeFn(LongSupplier supplier) - { - getUptimeFn = supplier; - } - - private static final int MIGRATION_DELAY_IN_MS = 60000; - - private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); - private MigrationManager() {} private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint) diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java index dc4f89cef8..a763b45fcb 100644 --- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java @@ -50,6 +50,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; +import static org.junit.Assert.assertTrue; public class GossipHelper { @@ -222,7 +223,7 @@ public class GossipHelper InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom); EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); MigrationCoordinator.instance.reportEndpointVersion(endpoint, state); - MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10)); + assertTrue("schema is ready", MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10))); }).accept(pullFrom); } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 9ea6d01878..b2edb4bff1 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; @@ -102,7 +103,7 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.MigrationCoordinator; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.ActiveRepairService; @@ -145,7 +146,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance { public final IInstanceConfig config; private volatile boolean initialized = false; - private final long startedAt = System.nanoTime(); + private final AtomicLong startedAt = new AtomicLong(); // should never be invoked directly, so that it is instantiated on other class loader; // only visible for inheritance @@ -457,6 +458,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance @Override public void startup(ICluster cluster) { + assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt uninitialized"; + sync(() -> { try { @@ -541,7 +544,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting()); if (config.has(GOSSIP)) { - MigrationManager.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt)); + MigrationCoordinator.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt.get())); StorageService.instance.initServer(); StorageService.instance.removeShutdownHook(); Gossiper.waitToSettle(); @@ -753,7 +756,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance }).apply(isolatedExecutor); return CompletableFuture.runAsync(ThrowingRunnable.toRunnable(future::get), isolatedExecutor) - .thenRun(super::shutdown); + .thenRun(super::shutdown) + .thenRun(() -> startedAt.set(0L)); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java index cf895ecdec..ca89b43d0c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java @@ -28,9 +28,10 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; -import org.apache.cassandra.schema.MigrationCoordinator; import org.apache.cassandra.schema.Schema; +import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS; +import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; @@ -42,8 +43,9 @@ public class MigrationCoordinatorTest extends TestBaseImpl { System.clearProperty("cassandra.replace_address"); System.clearProperty("cassandra.consistent.rangemovement"); - System.clearProperty(MigrationCoordinator.IGNORED_ENDPOINTS_PROP); - System.clearProperty(MigrationCoordinator.IGNORED_VERSIONS_PROP); + + System.clearProperty(IGNORED_SCHEMA_CHECK_VERSIONS.getKey()); + System.clearProperty(IGNORED_SCHEMA_CHECK_VERSIONS.getKey()); } /** * We shouldn't wait on versions only available from a node being replaced @@ -86,7 +88,7 @@ public class MigrationCoordinatorTest extends TestBaseImpl IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); - System.setProperty(MigrationCoordinator.IGNORED_ENDPOINTS_PROP, ignoredEndpoint.getHostAddress()); + IGNORED_SCHEMA_CHECK_ENDPOINTS.setString(ignoredEndpoint.getHostAddress()); System.setProperty("cassandra.consistent.rangemovement", "false"); cluster.bootstrap(config).startup(); } @@ -113,7 +115,7 @@ public class MigrationCoordinatorTest extends TestBaseImpl IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); - System.setProperty(MigrationCoordinator.IGNORED_VERSIONS_PROP, initialVersion.toString() + ',' + oldVersion.toString()); + IGNORED_SCHEMA_CHECK_VERSIONS.setString(initialVersion.toString() + ',' + oldVersion); System.setProperty("cassandra.consistent.rangemovement", "false"); cluster.bootstrap(config).startup(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org