Hi folks, I am noticing my spark jobs being stuck when using the org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
It seems that whenever there is a stream failure it may be expected behavior based on the code to infinite loop. Here are one executors logs: 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is complete 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed On stream failure it seems StreamResultFuture sets the exception for the AbstractFuture. AFAIK this should cause the Abstract future to return a new ExecutionException. The problem seems to lie in the fact that the CqlBulkRecordWriter swallows the Execution exception and continues in a while loop: https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274 <https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274> When taking consecutive thread dumps on the same process I see that the only thread doing work is constantly creating new ExecutionExceptions (the memory location for ExecutionException was different on each thread dump): java.lang.Throwable.fillInStackTrace(Native Method) java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.util.concurrent.ExecutionException@80240763}) java.lang.Throwable.<init>(Throwable.java:310) java.lang.Exception.<init>(Exception.java:102) java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90) com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357) org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257) org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237) org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131) org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359) org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131) org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:99) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) It seems the logic that lies right below the while loop in linked code above that checks for failed hosts/streamsessions maybe should have been within the while loop? Thanks, Brett