Hi,
For some experiment I am doing, I am trying to do the following.
1.Created an abstract class Validator. Created case objects from Validator with
validate(row: Row): Boolean method.
2. Adding in a list all case objects
3. Each validate takes a Row into account, returns "itself" if validate returns
true, so then, I do this to return an arbitrary number for each match
def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {
var result: Int = -1
for (validator <- validators) {
validator.validate(row) match {
case Shortsale => result = 0
case Foreclosure => result = 1
case Nomatch => result 99
//...
}
}
result
}
val validators = List[ClientPath](
Shortsale,
Foreclosure)
4. Then I run the map[Int](row => evaluate_paths(row, validators)
But this blows up, it does not like the creation of the list of validators when
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly,
when calling evaluate_paths(some_row, validators).
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0
(TID 830, localhost): java.io.InvalidClassException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
no valid constructor at
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...
...
...
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at
org.apache.spark.scheduler.Task.run(Task.scala:70) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
------
Any advice grateful
Saif