Move handling of migration event source to solve bootstrap race. Patch by Sergio Bossa and brandonwilliams, reviewed by Tyler Hobbs for CASSANDRA-6648
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/93bd89fb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/93bd89fb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/93bd89fb Branch: refs/heads/trunk Commit: 93bd89fbba7d76821d3d85ae371cb2e665176b6a Parents: 34112ef Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Feb 4 18:33:44 2014 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Feb 4 18:33:44 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../cassandra/service/MigrationManager.java | 33 +++----------------- .../cassandra/service/StorageService.java | 7 ++--- 3 files changed, 9 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b1fade1..d3a78f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,8 @@ * Fix direct Memory on architectures that do not support unaligned long access (CASSANDRA-6628) * Let scrub optionally skip broken counter partitions (CASSANDRA-5930) +Merged from 1.2: + * Move handling of migration event source to solve bootstrap race. (CASSANDRA-6648) 2.0.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 0ffc7c4..b463116 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -50,7 +50,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; -public class MigrationManager implements IEndpointStateChangeSubscriber +public class MigrationManager { private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); @@ -63,7 +63,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public static final int MIGRATION_DELAY_IN_MS = 60000; private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>(); - + private MigrationManager() {} public void register(IMigrationListener listener) @@ -76,37 +76,14 @@ public class MigrationManager implements IEndpointStateChangeSubscriber listeners.remove(listener); } - public void onJoin(InetAddress endpoint, EndpointState epState) - {} - - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) - {} - - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) - { - if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress())) - return; - - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); - } - - public void onAlive(InetAddress endpoint, EndpointState state) + public void scheduleSchemaPull(InetAddress endpoint, EndpointState state) { VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); - if (value != null) + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); } - public void onDead(InetAddress endpoint, EndpointState state) - {} - - public void onRestart(InetAddress endpoint, EndpointState state) - {} - - public void onRemove(InetAddress endpoint) - {} - /** * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. @@ -166,7 +143,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public static boolean isReadyForBootstrap() { - return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; + return Schema.instance.getVersion() != null && !Schema.emptyVersion.equals(Schema.instance.getVersion()); } public void notifyCreateKeyspace(KSMetaData ksm) http://git-wip-us.apache.org/repos/asf/cassandra/blob/93bd89fb/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5f9657d..c222570 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -179,8 +179,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private static enum Mode { STARTING, NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING } private Mode operationMode = Mode.STARTING; - private final MigrationManager migrationManager = MigrationManager.instance; - /* Used for tracking drain progress */ private volatile int totalCFs, remainingCFs; @@ -367,7 +365,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void stopClient() { - Gossiper.instance.unregister(migrationManager); Gossiper.instance.unregister(this); Gossiper.instance.stop(); MessagingService.instance().shutdown(); @@ -473,7 +470,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("Starting up client gossip"); setMode(Mode.CLIENT, false); Gossiper.instance.register(this); - Gossiper.instance.register(migrationManager); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering. Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); @@ -630,7 +626,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); logger.info("Starting up server gossip"); Gossiper.instance.register(this); - Gossiper.instance.register(migrationManager); Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. // gossip snitch infos (local DC and rack) gossipSnitchInfo(); @@ -1964,6 +1959,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { onChange(endpoint, entry.getKey(), entry.getValue()); } + MigrationManager.instance.scheduleSchemaPull(endpoint, epState); } public void onAlive(InetAddress endpoint, EndpointState state) @@ -1982,6 +1978,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onJoinCluster(endpoint); } + MigrationManager.instance.scheduleSchemaPull(endpoint, state); } public void onRemove(InetAddress endpoint)