This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new ce6a65cb29 Clean up schema migration coordinator and tests
ce6a65cb29 is described below

commit ce6a65cb294af3b68ab2f4bba7945b097de42576
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Fri Apr 8 10:30:38 2022 -0600

    Clean up schema migration coordinator and tests
    
    patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17533
---
 CHANGES.txt                                        |  1 +
 .../config/CassandraRelevantProperties.java        |  4 +++
 .../cassandra/schema/MigrationCoordinator.java     | 34 ++++++++++++++++------
 .../apache/cassandra/schema/MigrationManager.java  | 12 --------
 .../cassandra/distributed/action/GossipHelper.java |  3 +-
 .../cassandra/distributed/impl/Instance.java       | 12 +++++---
 .../distributed/test/MigrationCoordinatorTest.java | 12 ++++----
 7 files changed, 47 insertions(+), 31 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8fcebb0797..32fdd68552 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.4
+ * Clean up schema migration coordinator and tests (CASSANDRA-17533)
  * Shut repair task executor down without interruption to avoid compromising 
shared channel proxies (CASSANDRA-17466)
  * Generate valid KEYSPACE / MATERIALIZED VIEW for CQL for views 
(CASSANDRA-17266)
  * Fix timestamp tz parsing (CASSANDRA-17467)
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 0887e1dfd2..4afb1ee9e2 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -152,6 +152,10 @@ public enum CassandraRelevantProperties
      */
     GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"),
 
+    IGNORED_SCHEMA_CHECK_VERSIONS("cassandra.skip_schema_check_for_versions"),
+
+    
IGNORED_SCHEMA_CHECK_ENDPOINTS("cassandra.skip_schema_check_for_endpoints"),
+
     /**
      * When doing a host replacement its possible that the gossip state is 
"empty" meaning that the endpoint is known
      * but the current state isn't known.  If the host replacement is needed 
to repair this state, this property must
diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java 
b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 824b83f30a..bf3aee70fc 100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongSupplier;
@@ -61,6 +62,9 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS;
+
 public class MigrationCoordinator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MigrationCoordinator.class);
@@ -80,12 +84,9 @@ public class MigrationCoordinator
 
     public static final MigrationCoordinator instance = new 
MigrationCoordinator();
 
-    public static final String IGNORED_VERSIONS_PROP = 
"cassandra.skip_schema_check_for_versions";
-    public static final String IGNORED_ENDPOINTS_PROP = 
"cassandra.skip_schema_check_for_endpoints";
-
     private static ImmutableSet<UUID> getIgnoredVersions()
     {
-        String s = System.getProperty(IGNORED_VERSIONS_PROP);
+        String s = IGNORED_SCHEMA_CHECK_VERSIONS.getString();
         if (s == null || s.isEmpty())
             return ImmutableSet.of();
 
@@ -104,7 +105,7 @@ public class MigrationCoordinator
     {
         Set<InetAddressAndPort> endpoints = new HashSet<>();
 
-        String s = System.getProperty(IGNORED_ENDPOINTS_PROP);
+        String s = IGNORED_SCHEMA_CHECK_ENDPOINTS.getString();
         if (s == null || s.isEmpty())
             return endpoints;
 
@@ -430,13 +431,28 @@ public class MigrationCoordinator
 
     private static Future<?> submitToMigrationIfNotShutdown(Runnable task)
     {
-        if (Stage.MIGRATION.executor().isShutdown() || 
Stage.MIGRATION.executor().isTerminated())
+        boolean skipped = false;
+        try
+        {
+            if (Stage.MIGRATION.executor().isShutdown() || 
Stage.MIGRATION.executor().isTerminated())
+            {
+                skipped = true;
+                return null;
+            }
+            return Stage.MIGRATION.submit(task);
+        }
+        catch (RejectedExecutionException ex)
         {
-            logger.info("Skipped scheduled pulling schema from other nodes: 
the MIGRATION executor service has been shutdown.");
+            skipped = true;
             return null;
         }
-        else
-            return Stage.MIGRATION.submit(task);
+        finally
+        {
+            if (skipped)
+            {
+                logger.info("Skipped scheduled pulling schema from other 
nodes: the MIGRATION executor service has been shutdown.");
+            }
+        }
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 9ebd33df9f..efe7c33532 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -50,18 +50,6 @@ public class MigrationManager
 
     public static final MigrationManager instance = new MigrationManager();
 
-    private static LongSupplier getUptimeFn = () -> 
ManagementFactory.getRuntimeMXBean().getUptime();
-
-    @VisibleForTesting
-    public static void setUptimeFn(LongSupplier supplier)
-    {
-        getUptimeFn = supplier;
-    }
-
-    private static final int MIGRATION_DELAY_IN_MS = 60000;
-
-    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = 
Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", 
"1"));
-
     private MigrationManager() {}
 
     private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java 
b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index dc4f89cef8..a763b45fcb 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+import static org.junit.Assert.assertTrue;
 
 public class GossipHelper
 {
@@ -222,7 +223,7 @@ public class GossipHelper
                 InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(pullFrom);
                 EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
                 MigrationCoordinator.instance.reportEndpointVersion(endpoint, 
state);
-                
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10));
+                assertTrue("schema is ready", 
MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10)));
             }).accept(pullFrom);
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 9ea6d01878..b2edb4bff1 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.ListenerNotFoundException;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -102,7 +103,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.MigrationCoordinator;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -145,7 +146,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
 {
     public final IInstanceConfig config;
     private volatile boolean initialized = false;
-    private final long startedAt = System.nanoTime();
+    private final AtomicLong startedAt = new AtomicLong();
 
     // should never be invoked directly, so that it is instantiated on other 
class loader;
     // only visible for inheritance
@@ -457,6 +458,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     @Override
     public void startup(ICluster cluster)
     {
+        assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt 
uninitialized";
+
         sync(() -> {
             try
             {
@@ -541,7 +544,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                 if (config.has(GOSSIP))
                 {
-                    MigrationManager.setUptimeFn(() -> 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt));
+                    MigrationCoordinator.setUptimeFn(() -> 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt.get()));
                     StorageService.instance.initServer();
                     StorageService.instance.removeShutdownHook();
                     Gossiper.waitToSettle();
@@ -753,7 +756,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }).apply(isolatedExecutor);
 
         return 
CompletableFuture.runAsync(ThrowingRunnable.toRunnable(future::get), 
isolatedExecutor)
-                                .thenRun(super::shutdown);
+                                .thenRun(super::shutdown)
+                                .thenRun(() -> startedAt.set(0L));
     }
 
     @Override
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java
index cf895ecdec..ca89b43d0c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/MigrationCoordinatorTest.java
@@ -28,9 +28,10 @@ import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.schema.MigrationCoordinator;
 import org.apache.cassandra.schema.Schema;
 
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
@@ -42,8 +43,9 @@ public class MigrationCoordinatorTest extends TestBaseImpl
     {
         System.clearProperty("cassandra.replace_address");
         System.clearProperty("cassandra.consistent.rangemovement");
-        System.clearProperty(MigrationCoordinator.IGNORED_ENDPOINTS_PROP);
-        System.clearProperty(MigrationCoordinator.IGNORED_VERSIONS_PROP);
+
+        System.clearProperty(IGNORED_SCHEMA_CHECK_VERSIONS.getKey());
+        System.clearProperty(IGNORED_SCHEMA_CHECK_VERSIONS.getKey());
     }
     /**
      * We shouldn't wait on versions only available from a node being replaced
@@ -86,7 +88,7 @@ public class MigrationCoordinatorTest extends TestBaseImpl
 
             IInstanceConfig config = cluster.newInstanceConfig();
             config.set("auto_bootstrap", true);
-            System.setProperty(MigrationCoordinator.IGNORED_ENDPOINTS_PROP, 
ignoredEndpoint.getHostAddress());
+            
IGNORED_SCHEMA_CHECK_ENDPOINTS.setString(ignoredEndpoint.getHostAddress());
             System.setProperty("cassandra.consistent.rangemovement", "false");
             cluster.bootstrap(config).startup();
         }
@@ -113,7 +115,7 @@ public class MigrationCoordinatorTest extends TestBaseImpl
 
             IInstanceConfig config = cluster.newInstanceConfig();
             config.set("auto_bootstrap", true);
-            System.setProperty(MigrationCoordinator.IGNORED_VERSIONS_PROP, 
initialVersion.toString() + ',' + oldVersion.toString());
+            IGNORED_SCHEMA_CHECK_VERSIONS.setString(initialVersion.toString() 
+ ',' + oldVersion);
             System.setProperty("cassandra.consistent.rangemovement", "false");
             cluster.bootstrap(config).startup();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to