p/4443/020_calculate_pending Wire a collection of relocating tokens into the calculation of endpoints for pending ranges.
Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f643a4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f643a4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f643a4a Branch: refs/heads/trunk Commit: 9f643a4ae62200d47fbe64f2b44260b767449767 Parents: 1f36519 Author: Eric Evans <eev...@apache.org> Authored: Fri Sep 14 10:08:53 2012 -0500 Committer: Eric Evans <eev...@apache.org> Committed: Fri Sep 14 10:11:00 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/locator/TokenMetadata.java | 91 ++++++++++++++- .../apache/cassandra/service/StorageService.java | 16 +++- 2 files changed, 105 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index c666c47..0ecb125 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -84,6 +84,8 @@ public class TokenMetadata // nodes which are migrating to the new tokens in the ring private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>(); + // tokens which are migrating to new endpoints + private final Map<Token, InetAddress> relocatingTokens = new HashMap<Token, InetAddress>(); /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -347,6 +349,33 @@ public class TokenMetadata } } + /** + * Add new relocating ranges (tokens moving from their respective endpoints, to another). + * @param tokens tokens being moved + * @param endpoint destination of moves + */ + public void addRelocatingTokens(Collection<Token> tokens, InetAddress endpoint) + { + assert endpoint != null; + assert tokens != null && tokens.size() > 0; + + lock.writeLock().lock(); + + try + { + for (Token token : tokens) + { + InetAddress prev = relocatingTokens.put(token, endpoint); + if (prev != null && !prev.equals(endpoint)) + logger.warn("Relocation of {} to {} overwrites previous to {}", new Object[]{token, endpoint, prev}); + } + } + finally + { + lock.writeLock().unlock(); + } + } + public void removeEndpoint(InetAddress endpoint) { assert endpoint != null; @@ -396,6 +425,38 @@ public class TokenMetadata } } + /** + * Remove pair of token/address from relocating ranges. + * @param endpoint + */ + public void removeFromRelocating(Token token, InetAddress endpoint) + { + assert endpoint != null; + assert token != null; + + lock.writeLock().lock(); + + try + { + InetAddress previous = relocatingTokens.remove(token); + + if (previous == null) + { + logger.debug("Cannot remove {}, not found among the relocating (previously removed?)", token); + } + else if (!previous.equals(endpoint)) + { + logger.warn( + "Removal of relocating token {} with mismatched endpoint ({} != {})", + new Object[]{token, endpoint, previous}); + } + } + finally + { + lock.writeLock().unlock(); + } + } + public Collection<Token> getTokens(InetAddress endpoint) { assert endpoint != null; @@ -470,6 +531,22 @@ public class TokenMetadata } } + public boolean isRelocating(Token token) + { + assert token != null; + + lock.readLock().lock(); + + try + { + return relocatingTokens.containsKey(token); + } + finally + { + lock.readLock().unlock(); + } + } + /** * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges, * bootstrap tokens and leaving endpoints are not included in the copy. @@ -513,7 +590,7 @@ public class TokenMetadata /** * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all - * current leave and move operations have finished. + * current leave, move, and relocate operations have finished. * * @return new token metadata */ @@ -532,6 +609,9 @@ public class TokenMetadata for (Pair<Token, InetAddress> pair : movingEndpoints) metadata.updateNormalToken(pair.left, pair.right); + for (Map.Entry<Token, InetAddress> relocating: relocatingTokens.entrySet()) + metadata.updateNormalToken(relocating.getKey(), relocating.getValue()); + return metadata; } finally @@ -654,6 +734,15 @@ public class TokenMetadata return movingEndpoints; } + /** + * Ranges which are migrating to new endpoints. + * @return set of token-address pairs of relocating ranges + */ + public Map<Token, InetAddress> getRelocatingRanges() + { + return relocatingTokens; + } + public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin) { assert ring.size() > 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f643a4a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 88780dd..07ee2bd 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1525,7 +1525,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. + // We can now finish the calculation by checking moving and relocating nodes. // For each of the moving nodes, we do the same thing we did for bootstrapping: // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. @@ -1544,6 +1544,20 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe allLeftMetadata.removeEndpoint(endpoint); } + // Ranges being relocated. + for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet()) + { + InetAddress endpoint = relocating.getValue(); // address of the moving node + Token token = relocating.getKey(); + + allLeftMetadata.updateNormalToken(token, endpoint); + + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + pendingRanges.put(range, endpoint); + + allLeftMetadata.removeEndpoint(endpoint); + } + tm.setPendingRanges(table, pendingRanges); if (logger.isDebugEnabled())