I would certainly try to mark the Validator class as Serializable...If that
doesn't do it you can also try and see if this flag sheds more light:
-Dsun.io.serialization.extendedDebugInfo=true

 By programming guide I mean this:
https://spark.apache.org/docs/latest/programming-guide.html I could almost
swear I had seen an extended section on tricky serialization issues (i.e.
scenarios where you end up serializing more than you think because of what
your closure captures) but I can't locate this section now...

On Mon, Jul 13, 2015 at 1:30 PM, <saif.a.ell...@wellsfargo.com> wrote:

>  Thank you very much for your time, here is how I designed the case
> classes, as far as I know they apply properly.
>
>
>
> Ps: By the way, what do you mean by “The programming guide?”
>
>
>
> abstract class Validator {
>
>
>
>     // positions to access with Row.getInt(x)
>
>     val shortsale_in_pos = 10
>
>     val month_pos = 11
>
>     val foreclosure_start_dt_pos = 14
>
>     val filemonth_dt_pos = 12
>
>     val reo_start_dt_pos = 14
>
>     // ..
>
>
>
>     // redesign into Iterable of Rows -->
>
>     def validate(input: org.apache.spark.sql.Row): Validator
>
>
>
> }
>
>
>
> case object Nomatch extends Validator {
>
>     def validate(input: Row): Validator = this
>
>   }
>
>
>
> case object Shortsale extends Validator {
>
>     def validate(input: Row): Validator = {
>
>         var check1: Boolean = if (input.getDouble(shortsale_in_pos) >
> 140.0) true else false
>
>         if (check1) this else Nomatch
>
>     }
>
> }
>
>
>
> Saif
>
>
>
> *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
> *Sent:* Monday, July 13, 2015 2:16 PM
> *To:* Ellafi, Saif A.
> *Cc:* user@spark.apache.org
> *Subject:* Re: java.io.InvalidClassException
>
>
>
> It's a bit hard to tell from the snippets of code but it's likely related
> to the fact that when you serialize instances the enclosing class, if any,
> also gets serialized, as well as any other place where fields used in the
> closure come from...e.g.check this discussion:
> http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor
>
>
>
> The programming guide also has good advice on serialization issues. I
> would particulary check how Shortsale/Nomatch/Foreclosure are declared
> (I'd advise making them top-level case classes)...
>
>
>
> On Mon, Jul 13, 2015 at 12:32 PM, <saif.a.ell...@wellsfargo.com> wrote:
>
> Hi,
>
>
>
> For some experiment I am doing, I am trying to do the following.
>
>
>
> 1.Created an abstract class Validator. Created case objects from Validator
> with validate(row: Row): Boolean method.
>
>
>
> 2. Adding in a list all case objects
>
>
>
> 3. Each validate takes a Row into account, returns *“itself” *if validate
> returns true, so then, I do this to return an arbitrary number for each
> match
>
> def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {
>
>
>
>     var result: Int = -1
>
>
>
>     for (validator <- validators) {
>
>         validator.validate(row) match {
>
>             case Shortsale =>  result = 0
>
>             case Foreclosure => result = 1
>
>             case Nomatch => result 99
>
>             //...
>
>         }
>
>     }
>
>     result
>
> }
>
>
>
> val validators = List[ClientPath](
>
>     Shortsale,
>
>     Foreclosure)
>
>
>
> 4. Then I run the map[Int](row => evaluate_paths(row, validators)
>
>
>
> But this blows up, it does not like the creation of the list of validators
> when executing an action function on the RDD such as take(1).
>
> I have tried also instead of list, an Iterator and Array, but no case.
> Also replaced the for loop with a while loop.
>
> Curiously, I tried with custom-made Rows, and the execution works
> properly, when calling evaluate_paths(some_row, validators).
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 125.0 (TID 830, localhost): java.io.InvalidClassException:
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
> no valid constructor at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> ...
>
> ...
>
> ...
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at
> org.apache.spark.scheduler.Task.run(Task.scala:70) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Driver stacktrace: at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236) at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
> ------
>
>
>
> Any advice grateful
>
> Saif
>
>
>
>
>

Reply via email to