arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420972314
########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ########## @@ -69,64 +78,217 @@ public class RecordWriterTest @BeforeEach public void setUp() { - tw = new MockTableWriter(folder); - ring = RingUtils.buildRing(0, "DC1", "test", 12); - writerContext = new MockBulkWriterContext(ring); + tw = new MockTableWriter(folder.getRoot()); + tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 3), 12); + writerContext = new MockBulkWriterContext(tokenRangeMapping); tc = new TestTaskContext(); range = writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId()); tokenizer = new Tokenizer(writerContext); } @Test - public void testSuccessfulWrite() + public void testWriteFailWhenTopologyChangeWithinTask() + { + // Generate token range mapping to simulate node movement of the first node by assigning it a different token + // within the same partition + int moveTargetToken = 50000; + TokenRangeMapping<RingInstance> testMapping = + TokenRangeMappingUtils.buildTokenRangeMapping(100000, + ImmutableMap.of("DC1", 3), + 12, + true, + moveTargetToken); + + MockBulkWriterContext m = Mockito.spy(writerContext); + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + + when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); + assertThat(ex.getMessage(), endsWith("Token range mappings have changed since the task started")); + } + + @Test + public void testWriteWithExclusions() + { + TokenRangeMapping<RingInstance> testMapping = + TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000, + ImmutableMap.of("DC1", 3), + 12); + + MockBulkWriterContext m = Mockito.spy(writerContext); + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + + when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); + when(m.getInstanceAvailability()).thenCallRealMethod(); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + rw.write(data); + Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas + } + + @Test + public void testSuccessfulWrite() throws InterruptedException { Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test - public void testWriteWithConstantTTL() + public void testSuccessfulWriteCheckUploads() { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + rw.write(data); + Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas + assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); + List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); + for (UploadRequest ur : requests) + { + assertNotNull(ur.fileHash); + } + } + + @Test + public void testWriteWithConstantTTL() throws InterruptedException + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); } @Test - public void testWriteWithTTLColumn() + public void testWriteWithTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, false); - String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"}; + String[] columnNamesWithTtl = + { + "id", "date", "course", "marks", "ttl" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl); } @Test - public void testWriteWithConstantTimestamp() + public void testWriteWithConstantTimestamp() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); } @Test - public void testWriteWithTimestampColumn() + public void testWriteWithTimestampColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, true); - String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", "timestamp"}; + String[] columnNamesWithTimestamp = + { + "id", "date", "course", "marks", "timestamp" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTimestamp); } @Test - public void testWriteWithTimestampAndTTLColumn() + public void testWriteWithTimestampAndTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, true); - String[] columnNames = {"id", "date", "course", "marks", "ttl", "timestamp"}; + String[] columnNames = + { + "id", "date", "course", "marks", "ttl", "timestamp" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNames); } + @Test + public void testWriteWithSubRanges() + { + MockBulkWriterContext m = Mockito.spy(writerContext); + TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); + when(m.job().getTokenPartitioner()).thenReturn(mtp); + + // Override partition's token range to span across ranges to force a split into sub-ranges + Range<BigInteger> overlapRange = Range.closed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(200000)); + when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange); + + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + List<StreamResult> res = rw.write(data); + assertEquals(1, res.size()); + assertNotEquals(overlapRange, res.get(0).tokenRange); + final Map<CassandraInstance, List<UploadRequest>> uploads = m.getUploads(); + // Should upload to 3 replicas + assertEquals(uploads.keySet().size(), REPLICA_COUNT); + assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); + List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); + for (UploadRequest ur : requests) + { + assertNotNull(ur.fileHash); + } + } + + @Test + public void testWriteWithSubRanges2() Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org