arjunashok commented on code in PR #17:
URL:
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420972314
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -69,64 +78,217 @@ public class RecordWriterTest
@BeforeEach
public void setUp()
{
- tw = new MockTableWriter(folder);
- ring = RingUtils.buildRing(0, "DC1", "test", 12);
- writerContext = new MockBulkWriterContext(ring);
+ tw = new MockTableWriter(folder.getRoot());
+ tokenRangeMapping =
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1",
3), 12);
+ writerContext = new MockBulkWriterContext(tokenRangeMapping);
tc = new TestTaskContext();
range =
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
tokenizer = new Tokenizer(writerContext);
}
@Test
- public void testSuccessfulWrite()
+ public void testWriteFailWhenTopologyChangeWithinTask()
+ {
+ // Generate token range mapping to simulate node movement of the first
node by assigning it a different token
+ // within the same partition
+ int moveTargetToken = 50000;
+ TokenRangeMapping<RingInstance> testMapping =
+ TokenRangeMappingUtils.buildTokenRangeMapping(100000,
+ ImmutableMap.of("DC1",
3),
+ 12,
+ true,
+ moveTargetToken);
+
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ RuntimeException ex = assertThrows(RuntimeException.class, () ->
rw.write(data));
+ assertThat(ex.getMessage(), endsWith("Token range mappings have
changed since the task started"));
+ }
+
+ @Test
+ public void testWriteWithExclusions()
+ {
+ TokenRangeMapping<RingInstance> testMapping =
+ TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000,
+ ImmutableMap.of("DC1",
3),
+ 12);
+
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+ when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
+ when(m.getInstanceAvailability()).thenCallRealMethod();
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ rw.write(data);
+ Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
+ }
+
+ @Test
+ public void testSuccessfulWrite() throws InterruptedException
{
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithConstantTTL()
+ public void testSuccessfulWriteCheckUploads()
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc,
SSTableWriter::new);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ rw.write(data);
+ Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ for (UploadRequest ur : requests)
+ {
+ assertNotNull(ur.fileHash);
+ }
+ }
+
+ @Test
+ public void testWriteWithConstantTTL() throws InterruptedException
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithTTLColumn()
+ public void testWriteWithTTLColumn() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, false);
- String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+ String[] columnNamesWithTtl =
+ {
+ "id", "date", "course", "marks", "ttl"
+ };
validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
}
@Test
- public void testWriteWithConstantTimestamp()
+ public void testWriteWithConstantTimestamp() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
}
@Test
- public void testWriteWithTimestampColumn()
+ public void testWriteWithTimestampColumn() throws InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, true);
- String[] columnNamesWithTimestamp = {"id", "date", "course", "marks",
"timestamp"};
+ String[] columnNamesWithTimestamp =
+ {
+ "id", "date", "course", "marks", "timestamp"
+ };
validateSuccessfulWrite(bulkWriterContext, data,
columnNamesWithTimestamp);
}
@Test
- public void testWriteWithTimestampAndTTLColumn()
+ public void testWriteWithTimestampAndTTLColumn() throws
InterruptedException
{
- MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(tokenRangeMapping);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, true);
- String[] columnNames = {"id", "date", "course", "marks", "ttl",
"timestamp"};
+ String[] columnNames =
+ {
+ "id", "date", "course", "marks", "ttl", "timestamp"
+ };
validateSuccessfulWrite(bulkWriterContext, data, columnNames);
}
+ @Test
+ public void testWriteWithSubRanges()
+ {
+ MockBulkWriterContext m = Mockito.spy(writerContext);
+ TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
+ when(m.job().getTokenPartitioner()).thenReturn(mtp);
+
+ // Override partition's token range to span across ranges to force a
split into sub-ranges
+ Range<BigInteger> overlapRange =
Range.closed(BigInteger.valueOf(-9223372036854775808L),
BigInteger.valueOf(200000));
+ when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange);
+
+ rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+ List<StreamResult> res = rw.write(data);
+ assertEquals(1, res.size());
+ assertNotEquals(overlapRange, res.get(0).tokenRange);
+ final Map<CassandraInstance, List<UploadRequest>> uploads =
m.getUploads();
+ // Should upload to 3 replicas
+ assertEquals(uploads.keySet().size(), REPLICA_COUNT);
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ for (UploadRequest ur : requests)
+ {
+ assertNotNull(ur.fileHash);
+ }
+ }
+
+ @Test
+ public void testWriteWithSubRanges2()
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]