I would certainly try to mark the Validator class as Serializable...If that doesn't do it you can also try and see if this flag sheds more light: -Dsun.io.serialization.extendedDebugInfo=true
By programming guide I mean this: https://spark.apache.org/docs/latest/programming-guide.html I could almost swear I had seen an extended section on tricky serialization issues (i.e. scenarios where you end up serializing more than you think because of what your closure captures) but I can't locate this section now... On Mon, Jul 13, 2015 at 1:30 PM, <saif.a.ell...@wellsfargo.com> wrote: > Thank you very much for your time, here is how I designed the case > classes, as far as I know they apply properly. > > > > Ps: By the way, what do you mean by “The programming guide?” > > > > abstract class Validator { > > > > // positions to access with Row.getInt(x) > > val shortsale_in_pos = 10 > > val month_pos = 11 > > val foreclosure_start_dt_pos = 14 > > val filemonth_dt_pos = 12 > > val reo_start_dt_pos = 14 > > // .. > > > > // redesign into Iterable of Rows --> > > def validate(input: org.apache.spark.sql.Row): Validator > > > > } > > > > case object Nomatch extends Validator { > > def validate(input: Row): Validator = this > > } > > > > case object Shortsale extends Validator { > > def validate(input: Row): Validator = { > > var check1: Boolean = if (input.getDouble(shortsale_in_pos) > > 140.0) true else false > > if (check1) this else Nomatch > > } > > } > > > > Saif > > > > *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com] > *Sent:* Monday, July 13, 2015 2:16 PM > *To:* Ellafi, Saif A. > *Cc:* user@spark.apache.org > *Subject:* Re: java.io.InvalidClassException > > > > It's a bit hard to tell from the snippets of code but it's likely related > to the fact that when you serialize instances the enclosing class, if any, > also gets serialized, as well as any other place where fields used in the > closure come from...e.g.check this discussion: > http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor > > > > The programming guide also has good advice on serialization issues. I > would particulary check how Shortsale/Nomatch/Foreclosure are declared > (I'd advise making them top-level case classes)... > > > > On Mon, Jul 13, 2015 at 12:32 PM, <saif.a.ell...@wellsfargo.com> wrote: > > Hi, > > > > For some experiment I am doing, I am trying to do the following. > > > > 1.Created an abstract class Validator. Created case objects from Validator > with validate(row: Row): Boolean method. > > > > 2. Adding in a list all case objects > > > > 3. Each validate takes a Row into account, returns *“itself” *if validate > returns true, so then, I do this to return an arbitrary number for each > match > > def evaluate_paths(row: Row, validators: List[ClientPath]): Int = { > > > > var result: Int = -1 > > > > for (validator <- validators) { > > validator.validate(row) match { > > case Shortsale => result = 0 > > case Foreclosure => result = 1 > > case Nomatch => result 99 > > //... > > } > > } > > result > > } > > > > val validators = List[ClientPath]( > > Shortsale, > > Foreclosure) > > > > 4. Then I run the map[Int](row => evaluate_paths(row, validators) > > > > But this blows up, it does not like the creation of the list of validators > when executing an action function on the RDD such as take(1). > > I have tried also instead of list, an Iterator and Array, but no case. > Also replaced the for loop with a while loop. > > Curiously, I tried with custom-made Rows, and the execution works > properly, when calling evaluate_paths(some_row, validators). > > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 125.0 (TID 830, localhost): java.io.InvalidClassException: > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$; > no valid constructor at > java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150) > at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > ... > > ... > > ... > > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at > org.apache.spark.scheduler.Task.run(Task.scala:70) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > Driver stacktrace: at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > > ------ > > > > Any advice grateful > > Saif > > > > >