To add some context about JPMML and spark: "The JPMML-Evaluator library
depends on JPMML-Model and Google Guava library versions that are in
conflict with the ones that are bundled with Apache Spark and/or Apache
Hadoop. I solved this using mvn shade plugin by using a different namespace
for JPMML-Evaluator."

When I remove the change in namespace from pom.xml, I get this error:

Exception in thread "main" org.jpmml.evaluator.InvalidFeatureException (at
or around line 7): DataDictionary
    at org.jpmml.evaluator.CacheUtil.getValue(CacheUtil.java:59)

    at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:108)
    at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:100)
    at
org.jpmml.evaluator.RegressionModelEvaluator.<init>(RegressionModelEvaluator.java:43)
    at com.x.ds.util.Predict.<init>(PredictConversion.java:24)
    at runner.SparkSimRunner.runSimulations(SparkSimRunner.java:133)
    at runner.SparkSimRunner.runSparkJob(SparkSimRunner.java:122)
    at runner.SparkRunner.main(SparkRunner.java:9)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


And when I add it back again, I get the error I shared in my previous email.

Not sure what's going on, has anyone successfully used JPMML to import a
PMML file in spark?


On Wed, Dec 9, 2015 at 11:01 AM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> I am trying to load a PMML file in a spark job. Instantiate it only once
> and pass it to the executors. But I get a NotSerializableException for
> org.xml.sax.helpers.LocatorImpl which is used inside jpmml.
>
> I have this class Prediction.java:
> public class Prediction implements Serializable {
>     private RegressionModelEvaluator rme;
>
>     public Prediction() throws Exception {
>         InputStream is = .....getResourceAsStream("model.pmml");
>         Source source = ImportFilter.apply(new InputSource(is));
>         PMML model = JAXBUtil.unmarshalPMML(source);
>         rme = new RegressionModelEvaluator(model);
>         is.close();
>     }
>
>     public Map predict(params) {
>        ......
>         return rme.evaluate(params);
>     }
> }
>
>
> Now I want to instantiate it only once since the
> "JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
> I instantiate inside the map{}
>
> So I do this in my driver:
>
> Prediction prediction = new Prediction();
> JavaRDD<String> result = rdd1
>                 .cartesian(rdd2)
>                 .map(t -> {...<need to use "prediction" here ...});
>
> But when I do that, I get this error:
> Caused by: java.io.NotSerializableException:
> org.xml.sax.helpers.LocatorImpl
> Serialization stack:
>     - object not serializable (class: org.xml.sax.helpers.LocatorImpl,
> value: org.xml.sax.helpers.LocatorImpl@6bce4140)
>     - field (class: org.dmg.pmml2_7.PMMLObject, name: locator, type:
> interface org.xml.sax.Locator)
>     - object (class org.dmg.pmml2_7.PMML, org.dmg.pmml2_7.PMML@722531ab)
>     - field (class: org.jpmml.evaluator.PMMLManager, name: pmml, type:
> class org.dmg.pmml2_7.PMML)
>     - object (class org.jpmml.evaluator.RegressionModelEvaluator,
> org.jpmml.evaluator.RegressionModelEvaluator@4c24f3a2)
>     - field (class: com.x.ds.util.Predict, name: rme, type: class
> org.jpmml.evaluator.RegressionModelEvaluator)
>     - object (class com.x.ds.util.Predict, com.x.ds.util.Predict@28f154cc)
>     - element of array (index: 1)
>     - array (class [Ljava.lang.Object;, size 2)
>     - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
> type: class [Ljava.lang.Object;)
>     - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class runner.SparkSimRunner,
> functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
> implementation=invokeSpecial
> runner/SparkSimRunner.lambda$runSimulations$ad93f7a8$1:(Lcom/x/ds/util/Predict;Lscala/Tuple2;)Ljava/util/List;,
> instantiatedMethodType=(Lscala/Tuple2;)Ljava/util/List;, numCaptured=2])
>     - writeReplace data (class: java.lang.invoke.SerializedLambda)
>     - object (class runner.SparkSimRunner$$Lambda$31/1961138094,
> runner.SparkSimRunner$$Lambda$31/1961138094@782fd504)
>     - field (class:
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
> fun$1, type: interface org.apache.spark.api.java.function.Function)
>     - object (class
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
> <function1>)
>     at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>
> I am doing this right?
>
> --
> Thanks,
> -Utkarsh
>



-- 
Thanks,
-Utkarsh

Reply via email to