To add some context about JPMML and spark: "The JPMML-Evaluator library
depends on JPMML-Model and Google Guava library versions that are in
conflict with the ones that are bundled with Apache Spark and/or Apache
Hadoop. I solved this using mvn shade plugin by using a different namespace
for JPMML-Evaluator."
When I remove the change in namespace from pom.xml, I get this error:
Exception in thread "main" org.jpmml.evaluator.InvalidFeatureException (at
or around line 7): DataDictionary
at org.jpmml.evaluator.CacheUtil.getValue(CacheUtil.java:59)
at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:108)
at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:100)
at
org.jpmml.evaluator.RegressionModelEvaluator.<init>(RegressionModelEvaluator.java:43)
at com.x.ds.util.Predict.<init>(PredictConversion.java:24)
at runner.SparkSimRunner.runSimulations(SparkSimRunner.java:133)
at runner.SparkSimRunner.runSparkJob(SparkSimRunner.java:122)
at runner.SparkRunner.main(SparkRunner.java:9)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
And when I add it back again, I get the error I shared in my previous email.
Not sure what's going on, has anyone successfully used JPMML to import a
PMML file in spark?
On Wed, Dec 9, 2015 at 11:01 AM, Utkarsh Sengar <[email protected]>
wrote:
> I am trying to load a PMML file in a spark job. Instantiate it only once
> and pass it to the executors. But I get a NotSerializableException for
> org.xml.sax.helpers.LocatorImpl which is used inside jpmml.
>
> I have this class Prediction.java:
> public class Prediction implements Serializable {
> private RegressionModelEvaluator rme;
>
> public Prediction() throws Exception {
> InputStream is = .....getResourceAsStream("model.pmml");
> Source source = ImportFilter.apply(new InputSource(is));
> PMML model = JAXBUtil.unmarshalPMML(source);
> rme = new RegressionModelEvaluator(model);
> is.close();
> }
>
> public Map predict(params) {
> ......
> return rme.evaluate(params);
> }
> }
>
>
> Now I want to instantiate it only once since the
> "JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
> I instantiate inside the map{}
>
> So I do this in my driver:
>
> Prediction prediction = new Prediction();
> JavaRDD<String> result = rdd1
> .cartesian(rdd2)
> .map(t -> {...<need to use "prediction" here ...});
>
> But when I do that, I get this error:
> Caused by: java.io.NotSerializableException:
> org.xml.sax.helpers.LocatorImpl
> Serialization stack:
> - object not serializable (class: org.xml.sax.helpers.LocatorImpl,
> value: org.xml.sax.helpers.LocatorImpl@6bce4140)
> - field (class: org.dmg.pmml2_7.PMMLObject, name: locator, type:
> interface org.xml.sax.Locator)
> - object (class org.dmg.pmml2_7.PMML, org.dmg.pmml2_7.PMML@722531ab)
> - field (class: org.jpmml.evaluator.PMMLManager, name: pmml, type:
> class org.dmg.pmml2_7.PMML)
> - object (class org.jpmml.evaluator.RegressionModelEvaluator,
> org.jpmml.evaluator.RegressionModelEvaluator@4c24f3a2)
> - field (class: com.x.ds.util.Predict, name: rme, type: class
> org.jpmml.evaluator.RegressionModelEvaluator)
> - object (class com.x.ds.util.Predict, com.x.ds.util.Predict@28f154cc)
> - element of array (index: 1)
> - array (class [Ljava.lang.Object;, size 2)
> - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
> type: class [Ljava.lang.Object;)
> - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class runner.SparkSimRunner,
> functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
> implementation=invokeSpecial
> runner/SparkSimRunner.lambda$runSimulations$ad93f7a8$1:(Lcom/x/ds/util/Predict;Lscala/Tuple2;)Ljava/util/List;,
> instantiatedMethodType=(Lscala/Tuple2;)Ljava/util/List;, numCaptured=2])
> - writeReplace data (class: java.lang.invoke.SerializedLambda)
> - object (class runner.SparkSimRunner$$Lambda$31/1961138094,
> runner.SparkSimRunner$$Lambda$31/1961138094@782fd504)
> - field (class:
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
> fun$1, type: interface org.apache.spark.api.java.function.Function)
> - object (class
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
> <function1>)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>
> I am doing this right?
>
> --
> Thanks,
> -Utkarsh
>
--
Thanks,
-Utkarsh