[ https://issues.apache.org/jira/browse/CASSANDRA-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ryan Fowler updated CASSANDRA-6244: ----------------------------------- Comment: was deleted (was: commit b94f21925a41736682554c9ff934b1ff591f4711 Author: Ryan Fowler <ryan.fow...@singlewire.com> Date: Fri Oct 25 14:20:11 2013 -0500 Calculate pending ranges asynchronously Port of changes from 713bba to 1.2 branch. diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java new file mode 100644 index 0000000..b408c75 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.utils.BiMultiValMap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Collection; +import java.util.concurrent.*; + + +public class PendingRangeCalculatorService extends PendingRangeCalculatorServiceMBean +{ + public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService(); + + private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class); + private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal"); + + public PendingRangeCalculatorService() + { + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + } + + private class PendingRangeTask implements Runnable + { + public void run() + { + long start = System.currentTimeMillis(); + for (String table : Schema.instance.getNonSystemTables()) + { + calculatePendingRanges(Table.open(table).getReplicationStrategy(), table); + } + logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemTables().size(), System.currentTimeMillis() - start); + } + } + + public Future<?> update() + { + return executor.submit(new PendingRangeTask()); + } + + public void blockUntilFinished() + { + while (true) + { + if (executor.getActiveCount() + executor.getPendingTasks() == 0) + break; + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + + + /** + * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: + * + * (1) When in doubt, it is better to write too much to a node than too little. That is, if + * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning + * up unneeded data afterwards is better than missing writes during movement. + * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional + * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore + * we will first remove _all_ leaving tokens for the sake of calculation and then check what + * ranges would go where if all nodes are to leave. This way we get the biggest possible + * ranges with regard current leave operations, covering all subsets of possible final range + * values. + * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing + * complex calculations to see if multiple bootstraps overlap, we simply base calculations + * on the same token ring used before (reflecting situation after all leave operations have + * completed). Bootstrapping nodes will be added and removed one by one to that metadata and + * checked what their ranges would be. This will give us the biggest possible ranges the + * node could have. It might be that other bootstraps make our actual final ranges smaller, + * but it does not matter as we can clean up the data afterwards. + * + * NOTE: This is heavy and ineffective operation. This will be done only once when a node + * changes state in the cluster, so it should be manageable. + */ + // public & static for testing purposes + public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table) + { + TokenMetadata tm = StorageService.instance.getTokenMetadata(); + Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); + BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); + Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); + + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty()) + { + if (logger.isDebugEnabled()) + logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", table); + tm.setPendingRanges(table, pendingRanges); + return; + } + + Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); + + // Copy of metadata reflecting the situation after all leave operations are finished. + TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); + + // get all ranges that will be affected by leaving nodes + Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); + for (InetAddress endpoint : leavingEndpoints) + affectedRanges.addAll(addressRanges.get(endpoint)); + + // for each of those ranges, find what new nodes will be responsible for the range when + // all leaving nodes are gone. + for (Range<Token> range : affectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap())); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); + } + + // At this stage pendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + // For each of the bootstrapping nodes, simply add and remove them one by one to + // allLeftMetadata and check in between what their ranges would be. + for (InetAddress endpoint : bootstrapTokens.inverse().keySet()) + { + Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint); + + allLeftMetadata.updateNormalTokens(tokens, endpoint); + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + pendingRanges.put(range, endpoint); + allLeftMetadata.removeEndpoint(endpoint); + } + + // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving and relocating nodes. + + // For each of the moving nodes, we do the same thing we did for bootstrapping: + // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. + for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) + { + InetAddress endpoint = moving.right; // address of the moving node + + // moving.left is a new token of the endpoint + allLeftMetadata.updateNormalToken(moving.left, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + pendingRanges.put(range, endpoint); + } + + allLeftMetadata.removeEndpoint(endpoint); + } + + // Ranges being relocated. + for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet()) + { + InetAddress endpoint = relocating.getValue(); // address of the moving node + Token token = relocating.getKey(); + + allLeftMetadata.updateNormalToken(token, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + pendingRanges.put(range, endpoint); + + allLeftMetadata.removeEndpoint(endpoint); + } + + tm.setPendingRanges(table, pendingRanges); + + if (logger.isDebugEnabled()) + logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); + } +} diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java new file mode 100644 index 0000000..c9b04f0 --- /dev/null +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +public class PendingRangeCalculatorServiceMBean +{ +} diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 54f8abd..be8d057 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -650,6 +650,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } setMode(Mode.JOINING, "schema complete, ready to bootstrap", true); + setMode(Mode.JOINING, "waiting for pending range calculation", true); + PendingRangeCalculatorService.instance.blockUntilFinished(); + setMode(Mode.JOINING, "calculation complete, ready to bootstrap", true); if (logger.isDebugEnabled()) @@ -1322,7 +1325,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } tokenMetadata.addBootstrapTokens(tokens, endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); if (Gossiper.instance.usesHostId(endpoint)) tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); @@ -1458,7 +1461,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1493,7 +1496,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // at this point the endpoint is certainly a member with this token, so let's proceed // normally tokenMetadata.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1531,7 +1534,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.addMovingEndpoint(token, endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1551,7 +1554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Tokens {} are relocating to {}", tokens, endpoint); tokenMetadata.addRelocatingTokens(tokens, endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } /** @@ -1593,7 +1596,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Note that the endpoint is being removed tokenMetadata.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); // find the endpoint coordinating this removal that we need to notify when we're done String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); @@ -1616,12 +1619,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE removeEndpoint(endpoint); tokenMetadata.removeEndpoint(endpoint); tokenMetadata.removeBootstrapTokens(tokens); + if (!isClientMode) { for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onLeaveCluster(endpoint); } - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime) @@ -1663,124 +1667,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - /** - * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: - * - * (1) When in doubt, it is better to write too much to a node than too little. That is, if - * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning - * up unneeded data afterwards is better than missing writes during movement. - * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional - * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore - * we will first remove _all_ leaving tokens for the sake of calculation and then check what - * ranges would go where if all nodes are to leave. This way we get the biggest possible - * ranges with regard current leave operations, covering all subsets of possible final range - * values. - * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing - * complex calculations to see if multiple bootstraps overlap, we simply base calculations - * on the same token ring used before (reflecting situation after all leave operations have - * completed). Bootstrapping nodes will be added and removed one by one to that metadata and - * checked what their ranges would be. This will give us the biggest possible ranges the - * node could have. It might be that other bootstraps make our actual final ranges smaller, - * but it does not matter as we can clean up the data afterwards. - * - * NOTE: This is heavy and ineffective operation. This will be done only once when a node - * changes state in the cluster, so it should be manageable. - */ - private void calculatePendingRanges() - { - for (String table : Schema.instance.getNonSystemTables()) - calculatePendingRanges(Table.open(table).getReplicationStrategy(), table); - } - - // public & static for testing purposes - public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String table) - { - TokenMetadata tm = StorageService.instance.getTokenMetadata(); - Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create(); - BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens(); - Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints(); - - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty()) - { - if (logger.isDebugEnabled()) - logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", table); - tm.setPendingRanges(table, pendingRanges); - return; - } - - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(); - - // Copy of metadata reflecting the situation after all leave operations are finished. - TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft(); - - // get all ranges that will be affected by leaving nodes - Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) - affectedRanges.addAll(addressRanges.get(endpoint)); - - // for each of those ranges, find what new nodes will be responsible for the range when - // all leaving nodes are gone. - for (Range<Token> range : affectedRanges) - { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap())); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); - } - - // At this stage pendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - - // For each of the bootstrapping nodes, simply add and remove them one by one to - // allLeftMetadata and check in between what their ranges would be. - for (InetAddress endpoint : bootstrapTokens.inverse().keySet()) - { - Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint); - - allLeftMetadata.updateNormalTokens(tokens, endpoint); - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - allLeftMetadata.removeEndpoint(endpoint); - } - - // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving and relocating nodes. - - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) - { - InetAddress endpoint = moving.right; // address of the moving node - - // moving.left is a new token of the endpoint - allLeftMetadata.updateNormalToken(moving.left, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - { - pendingRanges.put(range, endpoint); - } - - allLeftMetadata.removeEndpoint(endpoint); - } - - // Ranges being relocated. - for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet()) - { - InetAddress endpoint = relocating.getValue(); // address of the moving node - Token token = relocating.getKey(); - - allLeftMetadata.updateNormalToken(token, endpoint); - - for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) - pendingRanges.put(range, endpoint); - - allLeftMetadata.removeEndpoint(endpoint); - } - - tm.setPendingRanges(table, pendingRanges); - - if (logger.isDebugEnabled()) - logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges())); - } /** * Finds living endpoints responsible for the given ranges @@ -1982,7 +1868,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void onRemove(InetAddress endpoint) { tokenMetadata.removeEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } public void onDead(InetAddress endpoint, EndpointState state) @@ -2772,7 +2658,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens())); tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress()); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); } public void decommission() throws InterruptedException @@ -2781,6 +2667,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new UnsupportedOperationException("local node is not a member of the token ring yet"); if (tokenMetadata.cloneAfterAllLeft().sortedTokens().size() < 2) throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); + PendingRangeCalculatorService.instance.blockUntilFinished(); for (String table : Schema.instance.getNonSystemTables()) { if (tokenMetadata.getPendingRanges(table, FBUtilities.getBroadcastAddress()).size() > 0) @@ -2812,7 +2699,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { SystemTable.setBootstrapState(SystemTable.BootstrapState.NEEDS_BOOTSTRAP); tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress()); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime())); int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2); @@ -2943,6 +2830,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<String> tablesToProcess = Schema.instance.getNonSystemTables(); + PendingRangeCalculatorService.instance.blockUntilFinished(); // checking if data is moving to this node for (String table : tablesToProcess) { @@ -3279,7 +3167,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE removingNode = endpoint; tokenMetadata.addLeavingEndpoint(endpoint); - calculatePendingRanges(); + PendingRangeCalculatorService.instance.update(); + // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us // we add our own token so other nodes to let us know when they're done Gossiper.instance.advertiseRemoving(endpoint, hostId, localHostId); diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index a457df8..95edc03 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.service.PendingRangeCalculatorService; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -139,7 +140,7 @@ public class SimpleStrategyTest extends SchemaLoader { strategy = getStrategy(table, tmd); - StorageService.calculatePendingRanges(strategy, table); + PendingRangeCalculatorService.calculatePendingRanges(strategy, table); int replicationFactor = strategy.getReplicationFactor(); ) > calculatePendingRanges could be asynchronous on 1.2 too > ------------------------------------------------------- > > Key: CASSANDRA-6244 > URL: https://issues.apache.org/jira/browse/CASSANDRA-6244 > Project: Cassandra > Issue Type: Improvement > Components: Core > Environment: Cassandra 1.2, AWS > Reporter: Ryan Fowler > Fix For: 1.2.12 > > Attachments: 6244.txt > > > calculatePendingRanges can hang up the Gossip thread to the point of a node > marking all the other nodes down. > I noticed that the same problem was resolved with CASSANDRA-5135, so I > attempted to port the patch from that issue to the 1.2 codebase. -- This message was sent by Atlassian JIRA (v6.1#6144)