arjunashok commented on code in PR #17:
URL:
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422004520
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
Review Comment:
Correct. Updated to utilize metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]