To add some context about JPMML and spark: "The JPMML-Evaluator library depends on JPMML-Model and Google Guava library versions that are in conflict with the ones that are bundled with Apache Spark and/or Apache Hadoop. I solved this using mvn shade plugin by using a different namespace for JPMML-Evaluator."
When I remove the change in namespace from pom.xml, I get this error: Exception in thread "main" org.jpmml.evaluator.InvalidFeatureException (at or around line 7): DataDictionary at org.jpmml.evaluator.CacheUtil.getValue(CacheUtil.java:59) at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:108) at org.jpmml.evaluator.ModelEvaluator.<init>(ModelEvaluator.java:100) at org.jpmml.evaluator.RegressionModelEvaluator.<init>(RegressionModelEvaluator.java:43) at com.x.ds.util.Predict.<init>(PredictConversion.java:24) at runner.SparkSimRunner.runSimulations(SparkSimRunner.java:133) at runner.SparkSimRunner.runSparkJob(SparkSimRunner.java:122) at runner.SparkRunner.main(SparkRunner.java:9) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) And when I add it back again, I get the error I shared in my previous email. Not sure what's going on, has anyone successfully used JPMML to import a PMML file in spark? On Wed, Dec 9, 2015 at 11:01 AM, Utkarsh Sengar <utkarsh2...@gmail.com> wrote: > I am trying to load a PMML file in a spark job. Instantiate it only once > and pass it to the executors. But I get a NotSerializableException for > org.xml.sax.helpers.LocatorImpl which is used inside jpmml. > > I have this class Prediction.java: > public class Prediction implements Serializable { > private RegressionModelEvaluator rme; > > public Prediction() throws Exception { > InputStream is = .....getResourceAsStream("model.pmml"); > Source source = ImportFilter.apply(new InputSource(is)); > PMML model = JAXBUtil.unmarshalPMML(source); > rme = new RegressionModelEvaluator(model); > is.close(); > } > > public Map predict(params) { > ...... > return rme.evaluate(params); > } > } > > > Now I want to instantiate it only once since the > "JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine > I instantiate inside the map{} > > So I do this in my driver: > > Prediction prediction = new Prediction(); > JavaRDD<String> result = rdd1 > .cartesian(rdd2) > .map(t -> {...<need to use "prediction" here ...}); > > But when I do that, I get this error: > Caused by: java.io.NotSerializableException: > org.xml.sax.helpers.LocatorImpl > Serialization stack: > - object not serializable (class: org.xml.sax.helpers.LocatorImpl, > value: org.xml.sax.helpers.LocatorImpl@6bce4140) > - field (class: org.dmg.pmml2_7.PMMLObject, name: locator, type: > interface org.xml.sax.Locator) > - object (class org.dmg.pmml2_7.PMML, org.dmg.pmml2_7.PMML@722531ab) > - field (class: org.jpmml.evaluator.PMMLManager, name: pmml, type: > class org.dmg.pmml2_7.PMML) > - object (class org.jpmml.evaluator.RegressionModelEvaluator, > org.jpmml.evaluator.RegressionModelEvaluator@4c24f3a2) > - field (class: com.x.ds.util.Predict, name: rme, type: class > org.jpmml.evaluator.RegressionModelEvaluator) > - object (class com.x.ds.util.Predict, com.x.ds.util.Predict@28f154cc) > - element of array (index: 1) > - array (class [Ljava.lang.Object;, size 2) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class runner.SparkSimRunner, > functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeSpecial > runner/SparkSimRunner.lambda$runSimulations$ad93f7a8$1:(Lcom/x/ds/util/Predict;Lscala/Tuple2;)Ljava/util/List;, > instantiatedMethodType=(Lscala/Tuple2;)Ljava/util/List;, numCaptured=2]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class runner.SparkSimRunner$$Lambda$31/1961138094, > runner.SparkSimRunner$$Lambda$31/1961138094@782fd504) > - field (class: > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: > fun$1, type: interface org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, > <function1>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) > > I am doing this right? > > -- > Thanks, > -Utkarsh > -- Thanks, -Utkarsh