frankgh commented on code in PR #72: URL: https://github.com/apache/cassandra-analytics/pull/72#discussion_r1711290581
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -184,6 +184,10 @@ public WriteResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator Range<BigInteger> currentRange = subRanges.get(currentRangeIndex); while (dataIterator.hasNext()) { + if (streamSession != null) + { + streamSession.throwIfLastStreamFailed(); Review Comment: I feel this check belongs to the `maybeSwitchToNewStreamSession` method? i.e. ```suggestion if (streamSession != null) { streamSession.throwIfLastStreamFailed(); if (streamSession.getTokenRange().equals(currentRange)) { return; } // Schedule data to be sent if we are processing a batch that has not been scheduled yet. // Complete existing writes (if any) before the existing stream session is closed flushAsync(taskContext.partitionId()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org