frankgh commented on code in PR #72:
URL: 
https://github.com/apache/cassandra-analytics/pull/72#discussion_r1711290581


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -184,6 +184,10 @@ public WriteResult write(Iterator<Tuple2<DecoratedKey, 
Object[]>> sourceIterator
             Range<BigInteger> currentRange = subRanges.get(currentRangeIndex);
             while (dataIterator.hasNext())
             {
+                if (streamSession != null)
+                {
+                    streamSession.throwIfLastStreamFailed();

Review Comment:
   I feel this check belongs to the `maybeSwitchToNewStreamSession` method?
   
   i.e.
   ```suggestion
           if (streamSession != null)
           {
               streamSession.throwIfLastStreamFailed();
   
               if (streamSession.getTokenRange().equals(currentRange))
               {
                   return;
               }
   
               // Schedule data to be sent if we are processing a batch that 
has not been scheduled yet.
               // Complete existing writes (if any) before the existing stream 
session is closed
               flushAsync(taskContext.partitionId());
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to