yifan-c commented on code in PR #17:
URL:
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1416591381
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -207,46 +203,62 @@ private Set<RingInstance>
instancesFromMapping(Map<Range<BigInteger>, List<RingI
private StreamSession maybeCreateStreamSession(TaskContext taskContext,
StreamSession streamSession,
Tuple2<DecoratedKey,
Object[]> rowData,
- Set<Range<BigInteger>>
newRanges,
-
ReplicaAwareFailureHandler<RingInstance> failureHandler) throws IOException
+ Set<Range<BigInteger>>
subRanges,
+
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+ List<StreamResult> results)
+ throws IOException, ExecutionException, InterruptedException
{
BigInteger token = rowData._1().getToken();
Range<BigInteger> tokenRange = getTokenRange(taskContext);
Preconditions.checkState(tokenRange.contains(token),
String.format("Received Token %s outside of
expected range %s", token, tokenRange));
- // token range for this partition is not among the write-replica-set
ranges
- if (!newRanges.contains(tokenRange))
+ // We have split ranges likely resulting from pending nodes
+ // Evaluate creating a new session if the token from current row is
part of a sub-range
+ if (subRanges.size() > 1)
{
- Set<Range<BigInteger>> subRanges =
getIntersectingSubRanges(newRanges, tokenRange);
- // We have split ranges - likely resulting from pending nodes
- if (subRanges.size() > 1)
- {
- // Create session using sub-range that contains the token from
current row
- Range<BigInteger> matchingRange = subRanges.stream().filter(r
-> r.contains(token)).findFirst().get();
- Preconditions.checkState(matchingRange != null,
- String.format("Received Token %s
outside of expected range %s", token, matchingRange));
+ // Create session using sub-range that contains the token from
current row
+ Range<BigInteger> matchingSubRange = subRanges.stream().filter(r
-> r.contains(token)).findFirst().get();
+ Preconditions.checkState(matchingSubRange != null,
+ String.format("Received Token %s outside
of expected range %s", token, matchingSubRange));
Review Comment:
The `checkState` will not ever see `matchingSubRange == null`. The reason is
that at line#222, if the value is null, the `get()` operation throws exception
already.
If the intent to provide a more user friendly error message, can you not
call `get()` and use `Optional<Range<BigInteger>> matchingSubRangeOpt` to
capture the result and run `checkState` on the optional.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -132,37 +133,32 @@ public List<StreamResult>
write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI
Map<String, Object> valueMap = new HashMap<>();
try
{
- List<RingInstance> exclusions =
failureHandler.getFailedInstances();
Set<Range<BigInteger>> newRanges =
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
.stream()
-
.filter(e -> !exclusions.contains(e.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
+ Range<BigInteger> tokenRange = getTokenRange(taskContext);
+ Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
+
Collections.singleton(tokenRange) :
+
getIntersectingSubRanges(newRanges, tokenRange);
while (dataIterator.hasNext())
{
Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
- streamSession = maybeCreateStreamSession(taskContext,
streamSession, rowData, newRanges, failureHandler);
-
- sessions.add(streamSession);
+ streamSession = maybeCreateStreamSession(taskContext,
streamSession, rowData, subRanges, failureHandler, results);
maybeCreateTableWriter(partitionId, baseDir);
writeRow(rowData, valueMap, partitionId,
streamSession.getTokenRange());
checkBatchSize(streamSession, partitionId, job);
}
- // Finalize SSTable for the last StreamSession
- if (sstableWriter != null || (streamSession != null && batchSize
!= 0))
+ // Cleanup SSTable writer and schedule the last stream
Review Comment:
"Cleanup SSTable writer" reads wrong to me. I would stick with "Finalize".
The code is to flush any data to sstable by closing the writer. Cleanup leads
me to think it remove files.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##########
@@ -50,13 +51,36 @@ private TokenRangeMappingUtils()
public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(final
int initialToken, final ImmutableMap<String, Integer> rfByDC, int
instancesPerDC)
{
+ return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC,
false, -1);
+ }
+ public static TokenRangeMapping<RingInstance>
buildTokenRangeMappingWithFailures(int initialToken,
+
ImmutableMap<String, Integer> rfByDC,
+
int instancesPerDC)
+ {
final List<RingInstance> instances = getInstances(initialToken,
rfByDC, instancesPerDC);
+ RingInstance instance = instances.remove(0);
+ RingEntry entry = instance.getRingInstance();
+ RingEntry newEntry = new RingEntry.Builder()
+ .datacenter(entry.datacenter())
+ .port(entry.port())
+ .address(entry.address())
+ .status(InstanceStatus.DOWN.name())
+ .state(entry.state())
+ .token(entry.token())
+ .fqdn(entry.fqdn())
+ .rack(entry.rack())
+ .owns(entry.owns())
+ .load(entry.load())
+ .hostId(entry.hostId())
+ .build();
+ RingInstance newInstance = new RingInstance(newEntry);
+ instances.add(0, newInstance);
ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
Map<String, Set<String>> writeReplicas =
- instances.stream()
- .collect(Collectors.groupingBy(RingInstance::getDataCenter,
-
Collectors.mapping(RingInstance::getNodeName, Collectors.toSet())));
+
instances.stream().collect(Collectors.groupingBy(RingInstance::getDataCenter,
+
Collectors.mapping(RingInstance::getNodeName,
+
Collectors.toSet())));
Review Comment:
nit: prefer to not reformatting if there is no actual code change.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##########
@@ -23,103 +23,120 @@
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
-public class BulkWriteValidator implements AutoCloseable
+public class BulkWriteValidator
{
private static final Logger LOGGER =
LoggerFactory.getLogger(BulkWriteValidator.class);
private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
- private final CassandraRingMonitor monitor;
private final JobInfo job;
private String phase = "Initializing";
private final ClusterInfo cluster;
public BulkWriteValidator(BulkWriterContext bulkWriterContext,
- Consumer<CancelJobEvent> cancelJobFunc) throws
Exception
+ ReplicaAwareFailureHandler<RingInstance>
failureHandler)
{
cluster = bulkWriterContext.cluster();
job = bulkWriterContext.job();
- failureHandler = new
ReplicaAwareFailureHandler<>(cluster.getRing(true));
- monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000,
TimeUnit.MILLISECONDS);
+ this.failureHandler = failureHandler;
}
- public void setPhase(String phase)
- {
- this.phase = phase;
- }
-
- public String getPhase()
- {
- return phase;
- }
-
- public void validateInitialEnvironment()
- {
- validateCLOrFail();
- }
-
- public void close()
- {
- monitor.stop();
- }
-
- private void validateCLOrFail()
- {
- updateInstanceAvailability();
- validateClOrFail(failureHandler, LOGGER, phase, job);
- }
-
- public static void
validateClOrFail(ReplicaAwareFailureHandler<RingInstance> failureHandler,
+ public static void validateClOrFail(TokenRangeMapping<RingInstance>
tokenRangeMapping,
+
ReplicaAwareFailureHandler<RingInstance> failureHandler,
Logger logger,
String phase,
JobInfo job)
{
Collection<AbstractMap.SimpleEntry<Range<BigInteger>,
Multimap<RingInstance, String>>> failedRanges =
- failureHandler.getFailedEntries(job.getConsistencyLevel(),
job.getLocalDC());
+ failureHandler.getFailedEntries(tokenRangeMapping,
job.getConsistencyLevel(), job.getLocalDC());
+
if (failedRanges.isEmpty())
{
logger.info("Succeeded {} with {}", phase,
job.getConsistencyLevel());
}
else
{
- String message = String.format("Failed to load %s ranges with %s
for job %s in phase %s",
+ String message = String.format("Failed to load %s ranges with %s
for job %s in phase %s.",
failedRanges.size(),
job.getConsistencyLevel(), job.getId(), phase);
logger.error(message);
- failedRanges.forEach(failedRange ->
failedRange.getValue().keySet().forEach(instance ->
- logger.error("Failed {} for {} on {}", phase,
failedRange.getKey(), instance.toString())));
+ failedRanges.forEach(failedRange ->
+ failedRange.getValue()
+ .keySet()
+ .forEach(instance ->
+ logger.error("Failed {}
for {} on {}",
+ phase,
+
failedRange.getKey(),
+
instance.toString())));
throw new RuntimeException(message);
}
}
- public static void updateFailureHandler(CommitResult commitResult,
- String phase,
+ public String getPhase()
+ {
+ return phase;
+ }
+
+ public void setPhase(String phase)
+ {
+ this.phase = phase;
+ }
+
+ public static void updateFailureHandler(CommitResult commitResult, String
phase,
ReplicaAwareFailureHandler<RingInstance> failureHandler)
{
LOGGER.debug("Commit Result: {}", commitResult);
commitResult.failures.forEach((uuid, err) -> {
LOGGER.warn("[{}]: {} failed on {} with message {}",
- uuid, phase, commitResult.instance.getNodeName(),
err.errMsg);
+ uuid,
+ phase,
+ commitResult.instance.getNodeName(),
+ err.errMsg);
failureHandler.addFailure(err.tokenRange, commitResult.instance,
err.errMsg);
});
}
+ public void validateClOrFail(TokenRangeMapping<RingInstance>
tokenRangeMapping)
+ {
+ // Updates failures by looking up instance metadata
+ updateInstanceAvailability();
+ // Fails if the failures violate consistency requirements
+ validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase,
job);
+ }
+
private void updateInstanceAvailability()
{
cluster.refreshClusterInfo();
Map<RingInstance, InstanceAvailability> availability =
cluster.getInstanceAvailability();
availability.forEach(this::checkInstance);
}
+ private void updateFailureHandler(RingInstance instance,
InstanceAvailability availability)
Review Comment:
This method is only called in `private void checkInstance(RingInstance
instance, InstanceAvailability availability)`, which throws exception when
`availability == InstanceAvailability.INVALID_STATE`. It voids the need to
check the condition again in this method.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinator.java:
##########
@@ -128,61 +128,51 @@ private Stream<ListenableFuture<CommitResult>>
commit(Map<RingInstance, Listenin
RingInstance
instance,
Map<String,
Range<BigInteger>> uploadRanges)
{
- if (cluster.instanceIsAvailable(instance))
Review Comment:
What if the instance is down? The new code will try to commit the sstable on
those down instances. Should it skip contacting sidecar and mark it as failure
directly?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java:
##########
@@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult
streamResult) throws ExecutionExc
@VisibleForTesting
List<RingInstance> getReplicas()
{
- Map<Range<BigInteger>, List<RingInstance>> overlappingRanges =
ring.getSubRanges(tokenRange).asMapOfRanges();
+ List<RingInstance> exclusions = failureHandler.getFailedInstances();
+ final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges =
tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges();
Review Comment:
drop the `final` for local scoped variable.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java:
##########
@@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange,
Instance casInstance, Strin
for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry :
overlappingFailures.asMapOfRanges().entrySet())
{
Multimap<Instance, String> newErrorMap =
ArrayListMultimap.create(entry.getValue());
-
newErrorMap.put(casInstance, errMessage);
mappingsToAdd.put(entry.getKey(), newErrorMap);
}
failedRangesMap.putAll(mappingsToAdd);
}
- public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC)
+ public List<Instance> getFailedInstances()
Review Comment:
How about returning a set instead of list?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -19,19 +19,42 @@
package org.apache.cassandra.spark.bulkwriter.token;
-import java.util.Collection;
+import java.util.Set;
-import com.google.common.base.Preconditions;
-
-import org.apache.cassandra.spark.common.model.CassandraInstance;
-import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public interface ConsistencyLevel
{
boolean isLocal();
- boolean checkConsistency(Collection<? extends CassandraInstance>
failedInsts, ReplicationFactor replicationFactor, String localDC);
-
+ Logger LOGGER = LoggerFactory.getLogger(ConsistencyLevel.class);
+
+ /**
+ * Checks if the consistency guarantees are maintained, given the failed,
blocked and replacing instances, consistency-level and the replication-factor.
+ * <pre>
+ * - QUORUM based consistency levels check for quorum using the
write-replica-set (instead of RF) as they include healthy and pending nodes.
+ * This is done to ensure that writes go to a quorum of healthy nodes
while accounting for potential failure in pending nodes becoming healthy.
+ * - ONE and TWO consistency guarantees are maintained by ensuring that
the failures leave us with at-least the corresponding healthy
+ * (and non-pending) nodes.
+ *
+ * For both the above cases, blocked instances are also considered as
failures while performing consistency checks.
+ * Write replicas are adjusted to exclude replacement nodes for
consistency checks, if we have replacement nodes that are not among the failed
instances.
+ * This is to ensure that we are writing to sufficient non-replacement
nodes as replacements can potentially fail leaving us with fewer nodes.
+ * </pre>
+ *
+ * TODO javadocs
Review Comment:
Is it addressed?
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -69,64 +78,217 @@ public class RecordWriterTest
@BeforeEach
public void setUp()
{
- tw = new MockTableWriter(folder);
- ring = RingUtils.buildRing(0, "DC1", "test", 12);
- writerContext = new MockBulkWriterContext(ring);
+ tw = new MockTableWriter(folder.getRoot());
+ tokenRangeMapping =
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1",
3), 12);
+ writerContext = new MockBulkWriterContext(tokenRangeMapping);
tc = new TestTaskContext();
range =
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
tokenizer = new Tokenizer(writerContext);
}
@Test
- public void testSuccessfulWrite()
+ public void testWriteFailWhenTopologyChangeWithinTask()
+ {
+ // Generate token range mapping to simulate node movement of the first
node by assigning it a different token
+ // within the same partition
+ int moveTargetToken = 50000;
+ TokenRangeMapping<RingInstance> testMapping =
+ TokenRangeMappingUtils.buildTokenRangeMapping(100000,
+ ImmutableMap.of("DC1",
3),
+ 12,
+ true,
+ moveTargetToken);
+
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ RuntimeException ex = assertThrows(RuntimeException.class, () ->
rw.write(data));
+ assertThat(ex.getMessage(), endsWith("Token range mappings have
changed since the task started"));
+ }
+
+ @Test
+ public void testWriteWithExclusions()
+ {
+ TokenRangeMapping<RingInstance> testMapping =
+ TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000,
+ ImmutableMap.of("DC1",
3),
+ 12);
+
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+ when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
+ when(m.getInstanceAvailability()).thenCallRealMethod();
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ rw.write(data);
+ Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
+ }
+
+ @Test
+ public void testSuccessfulWrite() throws InterruptedException
{
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithConstantTTL()
+ public void testSuccessfulWriteCheckUploads()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc,
SSTableWriter::new);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ rw.write(data);
+ Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ for (UploadRequest ur : requests)
+ {
+ assertNotNull(ur.fileHash);
+ }
+ }
+
+ @Test
+ public void testWriteWithConstantTTL() throws InterruptedException
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithTTLColumn()
+ public void testWriteWithTTLColumn() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, false);
- String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+ String[] columnNamesWithTtl =
+ {
+ "id", "date", "course", "marks", "ttl"
+ };
validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
}
@Test
- public void testWriteWithConstantTimestamp()
+ public void testWriteWithConstantTimestamp() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithTimestampColumn()
+ public void testWriteWithTimestampColumn() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, true);
- String[] columnNamesWithTimestamp = {"id", "date", "course", "marks",
"timestamp"};
+ String[] columnNamesWithTimestamp =
+ {
+ "id", "date", "course", "marks", "timestamp"
+ };
validateSuccessfulWrite(bulkWriterContext, data,
columnNamesWithTimestamp);
}
@Test
- public void testWriteWithTimestampAndTTLColumn()
+ public void testWriteWithTimestampAndTTLColumn() throws
InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, true);
- String[] columnNames = {"id", "date", "course", "marks", "ttl",
"timestamp"};
+ String[] columnNames =
+ {
+ "id", "date", "course", "marks", "ttl", "timestamp"
+ };
validateSuccessfulWrite(bulkWriterContext, data, columnNames);
}
+ @Test
+ public void testWriteWithSubRanges()
+ {
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
+ when(m.job().getTokenPartitioner()).thenReturn(mtp);
+
+ // Override partition's token range to span across ranges to force a
split into sub-ranges
+ Range<BigInteger> overlapRange =
Range.closed(BigInteger.valueOf(-9223372036854775808L),
BigInteger.valueOf(200000));
+ when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange);
+
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ List<StreamResult> res = rw.write(data);
+ assertEquals(1, res.size());
+ assertNotEquals(overlapRange, res.get(0).tokenRange);
+ final Map<CassandraInstance, List<UploadRequest>> uploads =
m.getUploads();
+ // Should upload to 3 replicas
+ assertEquals(uploads.keySet().size(), REPLICA_COUNT);
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ for (UploadRequest ur : requests)
+ {
+ assertNotNull(ur.fileHash);
+ }
+ }
+
+ @Test
+ public void testWriteWithSubRanges2()
Review Comment:
can the test name be more descriptive? If comment helps, please add comments
too.
##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java:
##########
@@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws
NoSuchElementException
* @return instance meta information
* @throws NoSuchElementException when the instance for {@code host}
does not exist
*/
+ @Override
public InstanceMetadata instanceFromHost(String host) throws
NoSuchElementException
{
- return cassandraTestContext.instancesConfig.instanceFromHost(host);
+ return
cassandraTestContext.instancesConfig().instanceFromHost(host);
}
}
@Provides
@Singleton
public SidecarConfiguration sidecarConfiguration()
{
- return new SidecarConfigurationImpl(new
ServiceConfigurationImpl("127.0.0.1"));
+ ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+ .host("0.0.0.0")
// binds to all interfaces, potential security issue if left running for long
Review Comment:
Is it required to bind to `0.0.0.0`?
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##########
@@ -82,6 +106,7 @@ public static TokenRangeMapping<RingInstance>
buildTokenRangeMapping(final int i
Collections.emptySet());
}
+
Review Comment:
nit: remove duplicated empty lines
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -346,19 +366,22 @@ void writeBuffered()
private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
Iterator<Tuple2<DecoratedKey,
Object[]>> data,
- String[] columnNames)
+ String[] columnNames) throws
InterruptedException
{
validateSuccessfulWrite(writerContext, data, columnNames,
UPLOADED_TABLES);
}
private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
Iterator<Tuple2<DecoratedKey,
Object[]>> data,
String[] columnNames,
- int uploadedTables)
+ int uploadedTables) throws
InterruptedException
{
RecordWriter rw = new RecordWriter(writerContext, columnNames, () ->
tc, SSTableWriter::new);
rw.write(data);
+ // Wait for uploads to finish
+ Thread.sleep(500);
Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ System.out.println("Uploads in test: " +
uploads.values().stream().mapToInt(List::size).sum());
Review Comment:
remove it or use logger.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -98,39 +95,19 @@ public void insert(@NotNull Dataset<Row> data, boolean
overwrite)
Tokenizer tokenizer = new Tokenizer(writerContext);
TableSchema tableSchema = writerContext.schema().getTableSchema();
JavaPairRDD<DecoratedKey, Object[]> sortedRDD = data.toJavaRDD()
- .map(Row::toSeq)
- .map(seq ->
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
- .map(tableSchema::normalize)
- .keyBy(tokenizer::getDecoratedKey)
-
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
+ .map(Row::toSeq)
+ .map(seq ->
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
+
.map(tableSchema::normalize)
+
.keyBy(tokenizer::getDecoratedKey)
+
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
Review Comment:
The indentation was so for the sake of readability.
The reformatting has no other change but just increasing indentations. Such
reformatting is not preferred as it creates noise for review.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -346,19 +366,22 @@ void writeBuffered()
private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
Iterator<Tuple2<DecoratedKey,
Object[]>> data,
- String[] columnNames)
+ String[] columnNames) throws
InterruptedException
{
validateSuccessfulWrite(writerContext, data, columnNames,
UPLOADED_TABLES);
}
private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
Iterator<Tuple2<DecoratedKey,
Object[]>> data,
String[] columnNames,
- int uploadedTables)
+ int uploadedTables) throws
InterruptedException
{
RecordWriter rw = new RecordWriter(writerContext, columnNames, () ->
tc, SSTableWriter::new);
rw.write(data);
+ // Wait for uploads to finish
+ Thread.sleep(500);
Review Comment:
using `sleep` is prone to flaky test. It depends a lot on the test runtime.
We should do the best to avoid the usage.
The `rw.write` is already blocking. It blocks until the sstables are
uploaded. I am not sure why sleep is needed.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
+ private Map<String, Set<String>>
getPendingReplicas(TokenRangeReplicasResponse response, Map<String,
Set<String>> writeReplicasByDC)
{
- return new RingInstance(ringEntry);
+ Set<String> readReplicas =
readReplicasFromTokenRangeResponse(response);
+ return writeReplicasByDC.entrySet()
+ .stream()
+ .filter(entry ->
entry.getValue().stream().noneMatch(readReplicas::contains))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
- protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+ private Multimap<RingInstance, Range<BigInteger>>
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+
List<ReplicaMetadata> replicaMetadata)
{
- GossipInfoResponse currentGossipInfo = gossipInfo;
- if (!forceRefresh && currentGossipInfo != null)
- {
- return currentGossipInfo;
- }
-
- synchronized (this)
+ Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap =
ArrayListMultimap.create();
+ for (ReplicaInfo rInfo : writeReplicas)
{
- if (forceRefresh || gossipInfo == null)
+ Range<BigInteger> range = Range.openClosed(new
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+ for (Map.Entry<String, List<String>> dcReplicaEntry :
rInfo.replicasByDatacenter().entrySet())
{
- try
- {
- gossipInfo =
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-
TimeUnit.MILLISECONDS);
- }
- catch (ExecutionException | InterruptedException exception)
- {
- LOGGER.error("Failed to retrieve gossip information");
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
- catch (TimeoutException exception)
- {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
+ // For each writeReplica, get metadata and update map to
include range
+ dcReplicaEntry.getValue().forEach(ipAddress -> {
+ // Get metadata for this IP; Create RingInstance
+ // TODO: Temporary change to extract IP from 'ip:port'
string. THis will go oway once
+ // corresponding change in sidecar is merged.
+ ReplicaMetadata replica = replicaMetadata.stream()
+ .filter(r ->
+
r.address().equals(ipAddress.split(":")[0]))
Review Comment:
what if it is an IPv6 address?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
Review Comment:
Replica metadata is exposed from the `TokenRangeReplicasResponse` now. Is
the implementation outdated?
##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.KryoRegister;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+ private static final String createTableStmt = "create table if not exists
%s (id int, course text, marks int, primary key (id));";
+ protected static final String retrieveRows = "select * from " +
TEST_KEYSPACE + ".%s";
+ public static final int rowCount = 1000;
+
+ public QualifiedTableName initializeSchema()
+ {
+ return initializeSchema(ImmutableMap.of("datacenter1", 1));
+ }
+
+ public QualifiedTableName initializeSchema(Map<String, Integer> rf)
+ {
+ createTestKeyspace(rf);
+ return createTestTable(createTableStmt);
+ }
+
+ public SparkConf generateSparkConf()
+ {
+ SparkConf sparkConf = new SparkConf()
+ .setAppName("Integration test Spark Cassandra
Bulk Reader Job")
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .set("spark.master", "local[8,4]");
+ BulkSparkConf.setupSparkConf(sparkConf, true);
+ KryoRegister.setup(sparkConf);
+ return sparkConf;
+ }
+
+ public SparkSession generateSparkSession(SparkConf sparkConf)
+ {
+ return SparkSession.builder()
+ .config(sparkConf)
+ .getOrCreate();
+ }
+
+ public Set<String> getDataForRange(Range<BigInteger> range)
+ {
+ // Iterate through all data entries; filter only entries that belong
to range; convert to strings
+ return generateExpectedData().stream()
+ .filter(t -> range.contains(t._1().getToken()))
+ .map(t -> t._2()[0] + ":" + t._2()[1] + ":" + t._2()[2])
+ .collect(Collectors.toSet());
+ }
+
+ public List<Tuple2<DecoratedKey, Object[]>> generateExpectedData()
+ {
+ // "create table if not exists %s (id int, course text, marks int,
primary key (id));";
+ List<ColumnType<?>> columnTypes = Arrays.asList(ColumnTypes.INT);
+ Tokenizer tokenizer = new Tokenizer(Arrays.asList(0),
+ Arrays.asList("id"),
+ columnTypes,
+ true
+ );
+ return IntStream.range(0, rowCount).mapToObj(recordNum -> {
+ Object[] columns = new Object[]
+ {
+ recordNum, "course" + recordNum, recordNum
+ };
+ return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
+ }).collect(Collectors.toList());
+ }
+
+ public Map<IUpgradeableInstance, Set<String>>
getInstanceData(List<IUpgradeableInstance> instances,
+ boolean
isPending)
+ {
+
+ return instances.stream().collect(Collectors.toMap(Function.identity(),
+ i ->
filterTokenRangeData(getRangesForInstance(i, isPending))));
+ }
+
+ public Set<String> filterTokenRangeData(List<Range<BigInteger>> ranges)
+ {
+ return ranges.stream()
+ .map(r -> getDataForRange(r))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ }
+
+ private List<Range<BigInteger>> getRangesForInstance(IUpgradeableInstance
instance, boolean isPending)
+ {
+ IInstanceConfig config = instance.config();
+ JmxClient client = JmxClient.builder()
+
.host(config.broadcastAddress().getAddress().getHostAddress())
+ .port(config.jmxPort())
+ .build();
+ StorageJmxOperations ss = client.proxy(StorageJmxOperations.class,
"org.apache.cassandra.db:type=StorageService");
+
+ Map<List<String>, List<String>> ranges = isPending ?
ss.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)
+ :
ss.getRangeToEndpointWithPortMap(TEST_KEYSPACE);
+
+ // filter ranges that belong to the instance
+ return ranges.entrySet()
+ .stream()
+ .filter(e ->
e.getValue().contains(instance.broadcastAddress().getAddress().getHostAddress()
+ + ":" +
instance.broadcastAddress().getPort()))
+ .map(e -> unwrapRanges(e.getKey()))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
Review Comment:
not aligned
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
+ private Map<String, Set<String>>
getPendingReplicas(TokenRangeReplicasResponse response, Map<String,
Set<String>> writeReplicasByDC)
{
- return new RingInstance(ringEntry);
+ Set<String> readReplicas =
readReplicasFromTokenRangeResponse(response);
+ return writeReplicasByDC.entrySet()
+ .stream()
+ .filter(entry ->
entry.getValue().stream().noneMatch(readReplicas::contains))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
- protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+ private Multimap<RingInstance, Range<BigInteger>>
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+
List<ReplicaMetadata> replicaMetadata)
{
- GossipInfoResponse currentGossipInfo = gossipInfo;
- if (!forceRefresh && currentGossipInfo != null)
- {
- return currentGossipInfo;
- }
-
- synchronized (this)
+ Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap =
ArrayListMultimap.create();
+ for (ReplicaInfo rInfo : writeReplicas)
{
- if (forceRefresh || gossipInfo == null)
+ Range<BigInteger> range = Range.openClosed(new
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+ for (Map.Entry<String, List<String>> dcReplicaEntry :
rInfo.replicasByDatacenter().entrySet())
{
- try
- {
- gossipInfo =
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-
TimeUnit.MILLISECONDS);
- }
- catch (ExecutionException | InterruptedException exception)
- {
- LOGGER.error("Failed to retrieve gossip information");
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
- catch (TimeoutException exception)
- {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
+ // For each writeReplica, get metadata and update map to
include range
+ dcReplicaEntry.getValue().forEach(ipAddress -> {
+ // Get metadata for this IP; Create RingInstance
+ // TODO: Temporary change to extract IP from 'ip:port'
string. THis will go oway once
+ // corresponding change in sidecar is merged.
Review Comment:
Is the todo resolved now?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
+ private Map<String, Set<String>>
getPendingReplicas(TokenRangeReplicasResponse response, Map<String,
Set<String>> writeReplicasByDC)
{
- return new RingInstance(ringEntry);
+ Set<String> readReplicas =
readReplicasFromTokenRangeResponse(response);
+ return writeReplicasByDC.entrySet()
+ .stream()
+ .filter(entry ->
entry.getValue().stream().noneMatch(readReplicas::contains))
Review Comment:
If my understanding is correct, the method is to find the pending nodes. If
so, the filter to only include that list of instances (instances in a DC) that
none is contained in `readReplicas` always returns `false`. The predicate feels
wrong to me.
##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningBaseTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.expansion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestTokenSupplier;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class JoiningBaseTest extends ResiliencyTestBase
Review Comment:
should it be abstract class?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -175,30 +324,31 @@ public void writeRow(Map<String, Object> valueMap,
}
}
- void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo
job) throws IOException
+ /**
+ * Stream to replicas; if batchSize is reached, "finalize" SST to
"schedule" streamSession
+ */
+ private void checkBatchSize(final StreamSession streamSession, final int
partitionId, final JobInfo job) throws IOException
Review Comment:
drop unnecessary `final` in the method parameters.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -136,35 +181,139 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey,
Object[]>> sourceIterato
}
}
+ private Map<Range<BigInteger>, List<RingInstance>>
taskTokenRangeMapping(TokenRangeMapping<RingInstance> tokenRange,
+
Range<BigInteger> taskTokenRange)
+ {
+ return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+ }
+
+ private Set<RingInstance> instancesFromMapping(Map<Range<BigInteger>,
List<RingInstance>> mapping)
+ {
+ return mapping.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Creates a new session if we have the current token range intersecting
the ranges from write replica-set.
+ * If we do find the need to split a range into sub-ranges, we create the
corresponding session for the sub-range
+ * if the token from the row data belongs to the range.
+ */
+ private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+ StreamSession streamSession,
+ Tuple2<DecoratedKey,
Object[]> rowData,
+ Set<Range<BigInteger>>
subRanges,
+
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+ List<StreamResult> results)
+ throws IOException, ExecutionException, InterruptedException
+ {
+ BigInteger token = rowData._1().getToken();
+ Range<BigInteger> tokenRange = getTokenRange(taskContext);
+
+ Preconditions.checkState(tokenRange.contains(token),
+ String.format("Received Token %s outside of
expected range %s", token, tokenRange));
+
+ // We have split ranges likely resulting from pending nodes
+ // Evaluate creating a new session if the token from current row is
part of a sub-range
+ if (subRanges.size() > 1)
+ {
+ // Create session using sub-range that contains the token from
current row
+ Range<BigInteger> matchingSubRange = subRanges.stream().filter(r
-> r.contains(token)).findFirst().get();
+ Preconditions.checkState(matchingSubRange != null,
+ String.format("Received Token %s outside
of expected range %s", token, matchingSubRange));
+ streamSession = maybeCreateSubRangeSession(taskContext,
streamSession, failureHandler, results, matchingSubRange);
+ }
+
+ // If we do not have any stream session at this point, we create a
session using the partition's token range
+ return (streamSession == null) ? createStreamSession(taskContext) :
streamSession;
+ }
+
+ /**
+ * Given that the token belongs to a sub-range, creates a new stream
session if either
+ * 1) we do not have an existing stream session, or 2) the existing stream
session corresponds to a range that
+ * does NOT match the sub-range the token belongs to.
+ */
+ private StreamSession maybeCreateSubRangeSession(TaskContext taskContext,
+ StreamSession
streamSession,
+
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+ List<StreamResult>
results,
+ Range<BigInteger>
matchingSubRange)
+ throws IOException, ExecutionException, InterruptedException
+ {
+ if (streamSession == null || streamSession.getTokenRange() !=
matchingSubRange)
+ {
+ LOGGER.debug("[{}] Creating stream session for range: {}",
taskContext.partitionId(), matchingSubRange);
+ // Schedule data to be sent if we are processing a batch that has
not been scheduled yet.
+ if (streamSession != null)
+ {
+ // Complete existing batched writes (if any) before the
existing stream session is closed
+ if (batchSize != 0)
+ {
+ finalizeSSTable(streamSession, taskContext.partitionId(),
sstableWriter, batchNumber, batchSize);
+ sstableWriter = null;
+ batchSize = 0;
+ }
+ results.add(streamSession.close());
+ }
+ streamSession = new StreamSession(writerContext,
getStreamId(taskContext), matchingSubRange, failureHandler);
+ }
+ return streamSession;
+ }
+
+ /**
+ * Get ranges from the set that intersect and/or overlap with the provided
token range
+ */
+ private Set<Range<BigInteger>>
getIntersectingSubRanges(Set<Range<BigInteger>> ranges, Range<BigInteger>
tokenRange)
+ {
+ return ranges.stream()
+ .filter(r -> r.isConnected(tokenRange) &&
!r.intersection(tokenRange).isEmpty())
+ .collect(Collectors.toSet());
+ }
+
+ private boolean
haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping,
TaskContext taskContext)
+ {
+ Range<BigInteger> taskTokenRange = getTokenRange(taskContext);
+ // Get the uncached, current view of the ring to compare with initial
ring
+ TokenRangeMapping<RingInstance> endTaskMapping =
writerContext.cluster().getTokenRangeMapping(false);
+ Map<Range<BigInteger>, List<RingInstance>> startMapping =
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
+ Map<Range<BigInteger>, List<RingInstance>> endMapping =
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
+
+ // Token ranges are identical and overall instance list is same
+ return !(startMapping.keySet().equals(endMapping.keySet()) &&
+
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));
+ }
+
private void validateAcceptableTimeSkewOrThrow(List<RingInstance> replicas)
{
- TimeSkewResponse timeSkewResponse =
writerContext.cluster().getTimeSkew(replicas);
- Instant localNow = Instant.now();
- Instant remoteNow = Instant.ofEpochMilli(timeSkewResponse.currentTime);
- Duration range =
Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes);
- if (localNow.isBefore(remoteNow.minus(range)) ||
localNow.isAfter(remoteNow.plus(range)))
+ if (!replicas.isEmpty())
{
- String message = String.format("Time skew between Spark and
Cassandra is too large. "
- + "Allowable skew is %d minutes. "
- + "Spark executor time is %s,
Cassandra instance time is %s",
-
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
- throw new UnsupportedOperationException(message);
+ TimeSkewResponse timeSkewResponse =
writerContext.cluster().getTimeSkew(replicas);
+ Instant localNow = Instant.now();
+ Instant remoteNow =
Instant.ofEpochMilli(timeSkewResponse.currentTime);
+ Duration range =
Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes);
+ if (localNow.isBefore(remoteNow.minus(range)) ||
localNow.isAfter(remoteNow.plus(range)))
+ {
+ final String message = String.format("Time skew between Spark
and Cassandra is too large. "
+ + "Allowable skew is %d
minutes. Spark executor time is %s, Cassandra instance time is %s",
+
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
+ throw new UnsupportedOperationException(message);
+ }
Review Comment:
looks like it only adds the check for whether `replicas` is empty. The
following has less indentation.
```java
private void validateAcceptableTimeSkewOrThrow(List<RingInstance>
replicas)
{
if (replicas.isEmpty())
{
return;
}
// original logic
}
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -110,20 +133,42 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey,
Object[]>> sourceIterato
Map<String, Object> valueMap = new HashMap<>();
try
{
+ Set<Range<BigInteger>> newRanges =
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+
.stream()
+
.map(Map.Entry::getKey)
+
.collect(Collectors.toSet());
Review Comment:
nit: this is simpler to get a copy of the key set
```suggestion
Set<Range<BigInteger>> newRanges = new
HashSet<>(initialTokenRangeMapping.getRangeMap().asMapOfRanges().keySet());
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
return cassandraVersion;
}
- public String getVersionFromFeature()
+ @Override
+ public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
{
- return null;
+ TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+ Map<RingInstance, InstanceAvailability> result =
+ mapping.getReplicaMetadata()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toMap(Function.identity(),
this::determineInstanceAvailability));
+
+ if (LOGGER.isDebugEnabled())
+ {
+ result.forEach((inst, avail) -> LOGGER.debug("Instance {} has
availability {}", inst, avail));
+ }
+ return result;
}
- protected List<NodeSettings> getAllNodeSettings()
+ private InstanceAvailability determineInstanceAvailability(RingInstance
instance)
{
- List<NodeSettings> allNodeSettings =
FutureUtils.bestEffortGet(allNodeSettingFutures,
-
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-
TimeUnit.SECONDS);
-
- if (allNodeSettings.isEmpty())
+ if (!instanceIsUp(instance.getRingInstance()))
{
- throw new RuntimeException(String.format("Unable to determine the
node settings. 0/%d instances available.",
-
allNodeSettingFutures.size()));
+ return InstanceAvailability.UNAVAILABLE_DOWN;
}
- else if (allNodeSettings.size() < allNodeSettingFutures.size())
+ if (instanceIsBlocked(instance))
{
- LOGGER.warn("{}/{} instances were used to determine the node
settings",
- allNodeSettings.size(), allNodeSettingFutures.size());
+ return InstanceAvailability.UNAVAILABLE_BLOCKED;
}
-
- return allNodeSettings;
- }
-
- public String getVersionFromSidecar()
- {
- NodeSettings nodeSettings = this.nodeSettings.get();
- if (nodeSettings != null)
+ if (instanceIsNormal(instance.getRingInstance()) ||
+ instanceIsTransitioning(instance.getRingInstance()) ||
+ instanceIsBeingReplaced(instance.getRingInstance()))
{
- return nodeSettings.releaseVersion();
+ return InstanceAvailability.AVAILABLE;
}
- return getLowestVersion(getAllNodeSettings());
+ LOGGER.info("No valid state found for instance {}", instance);
+ // If it's not one of the above, it's inherently INVALID.
+ return InstanceAvailability.INVALID_STATE;
}
- protected RingResponse getRingResponse()
+ private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
- RingResponse currentRingResponse = ringResponse;
- if (currentRingResponse != null)
+ Map<String, Set<String>> writeReplicasByDC;
+ Map<String, Set<String>> pendingReplicasByDC;
+ List<ReplicaMetadata> replicaMetadata;
+ Set<RingInstance> blockedInstances;
+ Set<RingInstance> replacementInstances;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+ try
{
- return currentRingResponse;
- }
+ TokenRangeReplicasResponse response =
getTokenRangesAndReplicaSets();
+ replicaMetadata = response.replicaMetadata();
- synchronized (this)
- {
- if (ringResponse == null)
+ tokenRangesByInstance =
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write
replica set ",
+ tokenRangesByInstance.size());
+
+ replacementInstances = response.replicaMetadata()
+ .stream()
+ .filter(m ->
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ blockedInstances = response.replicaMetadata().stream()
+ .map(RingInstance::new)
+ .filter(this::instanceIsBlocked)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i ->
i.getRingInstance().address())
+
.collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all
ranges into all hosts by DC
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr ->
wr.replicasByDatacenter().entrySet().stream())
+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) ->
filterAndMergeInstances(l1, l2, blockedIps)));
+
+ pendingReplicasByDC = getPendingReplicas(response,
writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- try
- {
- ringResponse = getCurrentRingResponse();
- }
- catch (Exception exception)
- {
- LOGGER.error("Failed to load Cassandra ring", exception);
- throw new RuntimeException(exception);
- }
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
- return ringResponse;
}
- }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges, ", exception);
+ throw new RuntimeException(exception);
+ }
- private RingResponse getCurrentRingResponse() throws Exception
- {
- return
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+ // Include availability info so CL checks can use it to exclude
replacement hosts
+ return new TokenRangeMapping<>(getPartitioner(),
+ getReplicationFactor(),
+ writeReplicasByDC,
+ pendingReplicasByDC,
+ tokenRangesByInstance,
+ replicaMetadata,
+ blockedInstances,
+ replacementInstances);
}
- private static List<RingInstance> getSerializableInstances(RingResponse
ringResponse)
+ private Set<String> filterAndMergeInstances(Set<String> instancesList1,
Set<String> instancesList2, Set<String> blockedIPs)
{
- return ringResponse.stream()
- .map(RingInstance::new)
- .collect(Collectors.toList());
+ Set<String> merged = new HashSet<>();
+ // Removes blocked instances. If this is included, remove
blockedInstances from CL checks
+ merged.addAll(instancesList1.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+ merged.addAll(instancesList2.stream().filter(i ->
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+ return merged;
}
- private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+ // Pending replicas are currently calculated by extracting the
non-read-replicas from the write-replica-set
+ // This will be replaced by the instance state metadata when it is
supported by the token-ranges API
+ private Map<String, Set<String>>
getPendingReplicas(TokenRangeReplicasResponse response, Map<String,
Set<String>> writeReplicasByDC)
{
- return new RingInstance(ringEntry);
+ Set<String> readReplicas =
readReplicasFromTokenRangeResponse(response);
+ return writeReplicasByDC.entrySet()
+ .stream()
+ .filter(entry ->
entry.getValue().stream().noneMatch(readReplicas::contains))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
- protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+ private Multimap<RingInstance, Range<BigInteger>>
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+
List<ReplicaMetadata> replicaMetadata)
{
- GossipInfoResponse currentGossipInfo = gossipInfo;
- if (!forceRefresh && currentGossipInfo != null)
- {
- return currentGossipInfo;
- }
-
- synchronized (this)
+ Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap =
ArrayListMultimap.create();
+ for (ReplicaInfo rInfo : writeReplicas)
{
- if (forceRefresh || gossipInfo == null)
+ Range<BigInteger> range = Range.openClosed(new
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+ for (Map.Entry<String, List<String>> dcReplicaEntry :
rInfo.replicasByDatacenter().entrySet())
{
- try
- {
- gossipInfo =
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-
TimeUnit.MILLISECONDS);
- }
- catch (ExecutionException | InterruptedException exception)
- {
- LOGGER.error("Failed to retrieve gossip information");
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
- catch (TimeoutException exception)
- {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Failed to retrieve gossip
information", exception);
- }
+ // For each writeReplica, get metadata and update map to
include range
+ dcReplicaEntry.getValue().forEach(ipAddress -> {
+ // Get metadata for this IP; Create RingInstance
+ // TODO: Temporary change to extract IP from 'ip:port'
string. THis will go oway once
+ // corresponding change in sidecar is merged.
+ ReplicaMetadata replica = replicaMetadata.stream()
+ .filter(r ->
+
r.address().equals(ipAddress.split(":")[0]))
+
.findFirst().get();
Review Comment:
can you handle the potential NoSuchElementException from `get()` gracefully?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java:
##########
@@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult
streamResult) throws ExecutionExc
@VisibleForTesting
List<RingInstance> getReplicas()
{
- Map<Range<BigInteger>, List<RingInstance>> overlappingRanges =
ring.getSubRanges(tokenRange).asMapOfRanges();
+ List<RingInstance> exclusions = failureHandler.getFailedInstances();
+ final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges =
tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges();
- Preconditions.checkState(overlappingRanges.keySet().size() == 1,
- String.format("Partition range %s is mapping
more than one range %s",
- tokenRange, overlappingRanges));
+ LOGGER.debug("[{}]: Stream session token range: {} overlaps with ring
ranges: {}", sessionID, tokenRange, overlappingRanges);
+ Preconditions.checkState(!tokenRange.isEmpty(),
Review Comment:
maybe move this check before getting the `overlappingRanges`, so that
`tokenRange` is validated before using.
Or.. do you mean to check `overlappingRanges` that is not empty?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws
ClassNotFoundException, IOE
// In order to best utilize the number of Spark cores while minimizing the
number of commit calls,
// we calculate the number of splits that will just match or exceed the
total number of available Spark cores.
- // NOTE: The actual number of partitions that result from this should
always be at least
- // the number of token ranges times the number of splits, but can be
slightly more.
- public int calculateSplits(CassandraRing<RingInstance> ring,
+ // Note that the actual number of partitions that result from this should
always be at least the number of token ranges * the number of splits,
+ // but can be slightly more.
+ public int calculateSplits(TokenRangeMapping<RingInstance>
tokenRangeMapping,
Integer numberSplits,
int defaultParallelism,
Integer cores)
{
- if (numberSplits >= 0)
+ if (numberSplits != -1)
{
return numberSplits;
}
- int tasksToRun = Math.max(cores, defaultParallelism);
- Map<Range<BigInteger>, List<RingInstance>> rangeListMap =
ring.getRangeMap().asMapOfRanges();
+ final int tasksToRun = Math.max(cores, defaultParallelism);
+ final Map<Range<BigInteger>, List<RingInstance>> rangeListMap =
tokenRangeMapping.getRangeMap().asMapOfRanges();
Review Comment:
drop the `final`
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -138,15 +143,15 @@ private void validate()
private void validateRangesDoNotOverlap()
{
List<Range<BigInteger>> sortedRanges =
partitionMap.asMapOfRanges().keySet().stream()
- .sorted(Comparator.comparing(Range::lowerEndpoint))
- .collect(Collectors.toList());
+
.sorted(Comparator.comparing(Range::lowerEndpoint))
+
.collect(Collectors.toList());
Range<BigInteger> previous = null;
for (Range<BigInteger> current : sortedRanges)
{
if (previous != null)
{
Preconditions.checkState(!current.isConnected(previous) ||
current.intersection(previous).isEmpty(),
- String.format("Two ranges in partition map are
overlapping %s %s", previous, current));
+ String.format("Two ranges in
partition map are overlapping %s %s", previous, current));
Review Comment:
since this line is changed.. You can use the other checkState api to lazily
evaluate the error message only when the check fails.
```suggestion
"Two ranges in partition map are
overlapping %s %s", previous, current);
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
}
@Override
- public boolean checkConsistency(Collection<? extends
CassandraInstance> failedInsts,
- ReplicationFactor
replicationFactor,
+ public boolean checkConsistency(Set<String> writeReplicas,
+ Set<String> pendingReplicas,
+ Set<String> replacementInstances,
+ Set<String> blockedInstances,
+ Set<String> failedInstanceIps,
String localDC)
{
-
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() !=
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
- "EACH_QUORUM doesn't make sense
for SimpleStrategy keyspaces");
-
- for (String datacenter :
replicationFactor.getOptions().keySet())
- {
- int rf = replicationFactor.getOptions().get(datacenter);
- if (failedInsts.stream()
- .filter(instance ->
instance.getDataCenter().matches(datacenter))
- .count() > (rf - (rf / 2 + 1)))
- {
- return false;
- }
- }
-
- return true;
+ return (failedInstanceIps.size() + blockedInstances.size()) <=
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));
Review Comment:
The implementation looks wrong to me. For EACH_QUORUM, the success condition
is to meet quorum in each DC. The replication factor could vary per DC.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws
ClassNotFoundException, IOE
// In order to best utilize the number of Spark cores while minimizing the
number of commit calls,
// we calculate the number of splits that will just match or exceed the
total number of available Spark cores.
- // NOTE: The actual number of partitions that result from this should
always be at least
- // the number of token ranges times the number of splits, but can be
slightly more.
- public int calculateSplits(CassandraRing<RingInstance> ring,
+ // Note that the actual number of partitions that result from this should
always be at least the number of token ranges * the number of splits,
+ // but can be slightly more.
+ public int calculateSplits(TokenRangeMapping<RingInstance>
tokenRangeMapping,
Integer numberSplits,
int defaultParallelism,
Integer cores)
{
- if (numberSplits >= 0)
+ if (numberSplits != -1)
Review Comment:
This change is worrisome. There is no validation on the value of
`numberSplits`. End user can provide a wrong value, say `-10`, the original
code can handle it, but the changed code break.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java:
##########
@@ -94,6 +95,12 @@ public Object[] normalize(Object[] row)
}
public Object[] getKeyColumns(Object[] allColumns)
+ {
+ return getKeyColumns(allColumns, keyFieldPositions);
+ }
+
+ @NotNull
+ public static Object[] getKeyColumns(Object[] allColumns, List<Integer>
keyFieldPositions)
Review Comment:
What is the benefit of allow specifying the key field positions? Looks like
the method is only used for testing via the `Tokenizer` constructor. If so, can
you add the `@VisibleForTesting` annotation?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
}
@Override
- public boolean checkConsistency(Collection<? extends
CassandraInstance> failedInsts,
- ReplicationFactor
replicationFactor,
+ public boolean checkConsistency(Set<String> writeReplicas,
+ Set<String> pendingReplicas,
+ Set<String> replacementInstances,
+ Set<String> blockedInstances,
+ Set<String> failedInstanceIps,
String localDC)
{
-
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() !=
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
- "EACH_QUORUM doesn't make sense
for SimpleStrategy keyspaces");
-
- for (String datacenter :
replicationFactor.getOptions().keySet())
- {
- int rf = replicationFactor.getOptions().get(datacenter);
- if (failedInsts.stream()
- .filter(instance ->
instance.getDataCenter().matches(datacenter))
- .count() > (rf - (rf / 2 + 1)))
- {
- return false;
- }
- }
-
- return true;
+ return (failedInstanceIps.size() + blockedInstances.size()) <=
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));
}
},
QUORUM
{
+ // Keyspaces exist with RF 1 or 2
Review Comment:
This comment does not make sense to be included in QUORUM.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java:
##########
@@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange,
Instance casInstance, Strin
for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry :
overlappingFailures.asMapOfRanges().entrySet())
{
Multimap<Instance, String> newErrorMap =
ArrayListMultimap.create(entry.getValue());
-
newErrorMap.put(casInstance, errMessage);
mappingsToAdd.put(entry.getKey(), newErrorMap);
}
failedRangesMap.putAll(mappingsToAdd);
}
- public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC)
+ public List<Instance> getFailedInstances()
{
- return !getFailedEntries(consistencyLevel, localDC).isEmpty();
+ return failedRangesMap.asMapOfRanges().values()
+ .stream()
+ .map(Multimap::keySet)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
- @SuppressWarnings("unused") // Convenience method can become useful in
the future
- public Collection<Range<BigInteger>> getFailedRanges(ConsistencyLevel
consistencyLevel, String localDC)
+ /**
+ * Given the number of failed instances for each token range, validates if
the consistency guarantees are maintained
+ * for the size of the ring and the consistency level.
+ *
+ * @return list of failed entries for token ranges that break consistency.
This should ideally be empty for a
+ * successful operation.
+ */
+ public Collection<AbstractMap.SimpleEntry<Range<BigInteger>,
Multimap<Instance, String>>>
+ getFailedEntries(TokenRangeMapping<? extends CassandraInstance>
tokenRangeMapping,
+ ConsistencyLevel cl,
+ String localDC)
{
- return getFailedEntries(consistencyLevel, localDC).stream()
- .map(AbstractMap.SimpleEntry::getKey)
- .collect(Collectors.toList());
- }
- @SuppressWarnings("unused") // Convenience method can become useful in
the future
- public Multimap<Instance, String> getFailedInstances(ConsistencyLevel
consistencyLevel, String localDC)
- {
- Multimap<Instance, String> failedInstances =
ArrayListMultimap.create();
- getFailedEntries(consistencyLevel, localDC).stream()
- .flatMap(failedMultiEntry ->
failedMultiEntry.getValue().entries().stream())
- .forEach(failedEntry ->
failedInstances.put(failedEntry.getKey(), failedEntry.getValue()));
+ List<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance,
String>>> failedEntries =
+ new ArrayList<>();
+
+ for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>>
failedRangeEntry
+ : failedRangesMap.asMapOfRanges().entrySet())
Review Comment:
it is unusual to break it into 2 lines..
##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java:
##########
@@ -209,31 +222,49 @@ private InstancesConfig
buildInstancesConfig(CassandraVersionProvider versionPro
int nativeTransportPort = tryGetIntConfig(config,
"native_transport_port", 9042);
InetSocketAddress address =
InetSocketAddress.createUnresolved(hostName,
nativeTransportPort);
- CQLSessionProvider sessionProvider = new
CQLSessionProvider(address, new NettyOptions());
+ addresses.add(address);
+ }
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ IUpgradeableInstance instance = cluster.get(i + 1); // 1-based
indexing to match node names;
+ IInstanceConfig config = instance.config();
+ String hostName = JMXUtil.getJmxHost(config);
+ int nativeTransportPort = tryGetIntConfig(config,
"native_transport_port", 9042);
+ InetSocketAddress address =
InetSocketAddress.createUnresolved(hostName,
+
nativeTransportPort);
+ TemporaryCqlSessionProvider sessionProvider = new
TemporaryCqlSessionProvider(address, new NettyOptions(), addresses);
this.sessionProviders.add(sessionProvider);
- JmxClient jmxClient = new JmxClient(hostName, config.jmxPort());
+ // The in-jvm dtest framework sometimes returns a cluster before
all the jmx infrastructure is initialized.
+ // In these cases, we want to wait longer than the default
retry/delay settings to connect.
Review Comment:
👍 thanks for the comment
##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/CassandraIntegrationTest.java:
##########
@@ -59,6 +59,13 @@
*/
int numDcs() default 1;
+ /**
+ * This is only applied in context of multi-DC tests. Returns true if the
keyspace is replicated
+ * across multiple DCs. Defaults to {@code true}
+ * @return whether the multi-DC test uses a cross-DC keyspace
+ */
+ boolean useCrossDcKeyspace() default true;
Review Comment:
It is more flexible to define keyspaces in the test directly with some
helper method, instead of declaring an option in the annotation. For example,
you can define keyspace with varying number of DCs and RFs per test.
That being said. I am fine with the current code.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java:
##########
@@ -58,27 +58,34 @@ public void testOneSplit()
@Test
public void testTwoSplits()
{
- CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1",
"test");
- partitioner = new TokenPartitioner(ring, 2, 2, 1, false);
- assertEquals(10, partitioner.numPartitions());
- assertEquals(0, getPartitionForToken(new
BigInteger("-4611686018427387905")));
- assertEquals(1, getPartitionForToken(new
BigInteger("-4611686018427387904")));
- assertEquals(1, getPartitionForToken(-1));
- assertEquals(2, getPartitionForToken(0)); // Single token range
- assertEquals(3, getPartitionForToken(1));
- assertEquals(3, getPartitionForToken(50));
- assertEquals(4, getPartitionForToken(51000));
- assertEquals(4, getPartitionForToken(51100));
- assertEquals(5, getPartitionForToken(100001));
- assertEquals(5, getPartitionForToken(100150));
- assertEquals(5, getPartitionForToken(150000));
- assertEquals(6, getPartitionForToken(150001));
- assertEquals(6, getPartitionForToken(200000));
- assertEquals(7, getPartitionForToken(200001));
- assertEquals(7, getPartitionForToken(new
BigInteger("4611686018427388003")));
- assertEquals(7, getPartitionForToken(new
BigInteger("4611686018427487903")));
- assertEquals(8, getPartitionForToken(new
BigInteger("4611686018427487904")));
- assertEquals(9, getPartitionForToken(new
BigInteger("9223372036854775807"))); // Single token range
+ final TokenRangeMapping<RingInstance> tokenRangeMapping =
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 3);
Review Comment:
drop the final
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java:
##########
@@ -26,4 +26,6 @@ public interface CassandraInstance
String getNodeName();
String getDataCenter();
+
+ String getIpAddress();
Review Comment:
add javadoc for the new method.
It is fine to ignore the existing methods that are w/o javadoc.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
}
@Override
- public boolean checkConsistency(Collection<? extends
CassandraInstance> failedInsts,
- ReplicationFactor
replicationFactor,
+ public boolean checkConsistency(Set<String> writeReplicas,
+ Set<String> pendingReplicas,
+ Set<String> replacementInstances,
+ Set<String> blockedInstances,
+ Set<String> failedInstanceIps,
String localDC)
{
-
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() !=
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
- "EACH_QUORUM doesn't make sense
for SimpleStrategy keyspaces");
-
- for (String datacenter :
replicationFactor.getOptions().keySet())
- {
- int rf = replicationFactor.getOptions().get(datacenter);
- if (failedInsts.stream()
- .filter(instance ->
instance.getDataCenter().matches(datacenter))
- .count() > (rf - (rf / 2 + 1)))
- {
- return false;
- }
- }
-
- return true;
+ return (failedInstanceIps.size() + blockedInstances.size()) <=
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));
Review Comment:
Looks like you moved the per DC check in
`ReplicaAwareFailureHandler#validateConsistency`
##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.testing;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.jetbrains.annotations.Nullable;
+
+public class TemporaryCqlSessionProvider extends CQLSessionProvider
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TemporaryCqlSessionProvider.class);
+ private Session localSession;
+ private final InetSocketAddress inet;
+ private final NettyOptions nettyOptions;
+ private final ReconnectionPolicy reconnectionPolicy;
+ private final List<InetSocketAddress> addresses = new ArrayList<>();
+
+ public TemporaryCqlSessionProvider(InetSocketAddress target, NettyOptions
options, List<InetSocketAddress> addresses)
+ {
+ super(target, options);
+ inet = target;
+ nettyOptions = options;
+ reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
+ this.addresses.addAll(addresses);
+ }
+
+ @Override
+ public synchronized @Nullable Session localCql()
Review Comment:
nit
```suggestion
@Override @Nullable
public synchronized Session localCql()
```
##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java:
##########
@@ -209,31 +222,49 @@ private InstancesConfig
buildInstancesConfig(CassandraVersionProvider versionPro
int nativeTransportPort = tryGetIntConfig(config,
"native_transport_port", 9042);
InetSocketAddress address =
InetSocketAddress.createUnresolved(hostName,
nativeTransportPort);
- CQLSessionProvider sessionProvider = new
CQLSessionProvider(address, new NettyOptions());
+ addresses.add(address);
+ }
+ for (int i = 0; i < cluster.size(); i++)
+ {
+ IUpgradeableInstance instance = cluster.get(i + 1); // 1-based
indexing to match node names;
+ IInstanceConfig config = instance.config();
+ String hostName = JMXUtil.getJmxHost(config);
+ int nativeTransportPort = tryGetIntConfig(config,
"native_transport_port", 9042);
+ InetSocketAddress address =
InetSocketAddress.createUnresolved(hostName,
+
nativeTransportPort);
+ TemporaryCqlSessionProvider sessionProvider = new
TemporaryCqlSessionProvider(address, new NettyOptions(), addresses);
this.sessionProviders.add(sessionProvider);
- JmxClient jmxClient = new JmxClient(hostName, config.jmxPort());
+ // The in-jvm dtest framework sometimes returns a cluster before
all the jmx infrastructure is initialized.
+ // In these cases, we want to wait longer than the default
retry/delay settings to connect.
Review Comment:
👍 thanks for the comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]