Hi folks,

I am noticing my spark jobs being stuck when using the 
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.


It seems that whenever there is a stream failure it may be expected behavior 
based on the code to infinite loop.

Here are one executors logs:
19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream 
#59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is complete
19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream 
#59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed


On stream failure it seems StreamResultFuture sets the exception for the 
AbstractFuture.
AFAIK this should cause the Abstract future to return a new ExecutionException.

The problem seems to lie in the fact that the CqlBulkRecordWriter swallows the 
Execution exception and continues in a while loop:
https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
 
<https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274>

When taking consecutive thread dumps on the same process I see that the only 
thread doing work is constantly creating new ExecutionExceptions (the memory 
location for ExecutionException was different on each thread dump):
java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding 
Monitor(java.util.concurrent.ExecutionException@80240763})
java.lang.Throwable.<init>(Throwable.java:310)
java.lang.Exception.<init>(Exception.java:102)
java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90)
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

It seems the logic that lies right below the while loop in linked code above 
that checks for failed hosts/streamsessions maybe should have been within the 
while loop?

Thanks,

Brett

Reply via email to