arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420972314


##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -69,64 +78,217 @@ public class RecordWriterTest
     @BeforeEach
     public void setUp()
     {
-        tw = new MockTableWriter(folder);
-        ring = RingUtils.buildRing(0, "DC1", "test", 12);
-        writerContext = new MockBulkWriterContext(ring);
+        tw = new MockTableWriter(folder.getRoot());
+        tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 
3), 12);
+        writerContext = new MockBulkWriterContext(tokenRangeMapping);
         tc = new TestTaskContext();
         range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
         tokenizer = new Tokenizer(writerContext);
     }
 
     @Test
-    public void testSuccessfulWrite()
+    public void testWriteFailWhenTopologyChangeWithinTask()
+    {
+        // Generate token range mapping to simulate node movement of the first 
node by assigning it a different token
+        // within the same partition
+        int moveTargetToken = 50000;
+        TokenRangeMapping<RingInstance> testMapping =
+        TokenRangeMappingUtils.buildTokenRangeMapping(100000,
+                                                      ImmutableMap.of("DC1", 
3),
+                                                      12,
+                                                      true,
+                                                      moveTargetToken);
+
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+        
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
+        assertThat(ex.getMessage(), endsWith("Token range mappings have 
changed since the task started"));
+    }
+
+    @Test
+    public void testWriteWithExclusions()
+    {
+        TokenRangeMapping<RingInstance> testMapping =
+        TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000,
+                                                      ImmutableMap.of("DC1", 
3),
+                                                      12);
+
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+        when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
+        when(m.getInstanceAvailability()).thenCallRealMethod();
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        rw.write(data);
+        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+    }
+
+    @Test
+    public void testSuccessfulWrite() throws InterruptedException
     {
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
         validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithConstantTTL()
+    public void testSuccessfulWriteCheckUploads()
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        rw.write(data);
+        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+        for (UploadRequest ur : requests)
+        {
+            assertNotNull(ur.fileHash);
+        }
+    }
+
+    @Test
+    public void testWriteWithConstantTTL() throws InterruptedException
+    {
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithTTLColumn()
+    public void testWriteWithTTLColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, false);
-        String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+        String[] columnNamesWithTtl =
+        {
+        "id", "date", "course", "marks", "ttl"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
     }
 
     @Test
-    public void testWriteWithConstantTimestamp()
+    public void testWriteWithConstantTimestamp() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithTimestampColumn()
+    public void testWriteWithTimestampColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, true);
-        String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", 
"timestamp"};
+        String[] columnNamesWithTimestamp =
+        {
+        "id", "date", "course", "marks", "timestamp"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, 
columnNamesWithTimestamp);
     }
 
     @Test
-    public void testWriteWithTimestampAndTTLColumn()
+    public void testWriteWithTimestampAndTTLColumn() throws 
InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, true);
-        String[] columnNames = {"id", "date", "course", "marks", "ttl", 
"timestamp"};
+        String[] columnNames =
+        {
+        "id", "date", "course", "marks", "ttl", "timestamp"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, columnNames);
     }
 
+    @Test
+    public void testWriteWithSubRanges()
+    {
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
+        when(m.job().getTokenPartitioner()).thenReturn(mtp);
+
+        // Override partition's token range to span across ranges to force a 
split into sub-ranges
+        Range<BigInteger> overlapRange = 
Range.closed(BigInteger.valueOf(-9223372036854775808L), 
BigInteger.valueOf(200000));
+        when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange);
+
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        List<StreamResult> res = rw.write(data);
+        assertEquals(1, res.size());
+        assertNotEquals(overlapRange, res.get(0).tokenRange);
+        final Map<CassandraInstance, List<UploadRequest>> uploads = 
m.getUploads();
+        // Should upload to 3 replicas
+        assertEquals(uploads.keySet().size(), REPLICA_COUNT);
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+        for (UploadRequest ur : requests)
+        {
+            assertNotNull(ur.fileHash);
+        }
+    }
+
+    @Test
+    public void testWriteWithSubRanges2()

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to