Hi,

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with 
validate(row: Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns "itself" if validate returns 
true, so then, I do this to return an arbitrary number for each match

def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

    var result: Int = -1

    for (validator <- validators) {
        validator.validate(row) match {
            case Shortsale =>  result = 0
            case Foreclosure => result = 1
            case Nomatch => result 99
            //...
        }
    }
    result
}

val validators = List[ClientPath](
    Shortsale,
    Foreclosure)

4. Then I run the map[Int](row => evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when 
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also 
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, 
when calling evaluate_paths(some_row, validators).

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 
(TID 830, localhost): java.io.InvalidClassException: 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
 no valid constructor at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
 at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...
...
...
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at 
org.apache.spark.scheduler.Task.run(Task.scala:70) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

 ------

Any advice grateful
Saif

Reply via email to