We have a feature engineering transformer defined as a custom class with
UDF as follows:
class FeatureModder extends Transformer with DefaultParamsWritable with
DefaultParamsReadable[FeatureModder] {
val uid: String = "FeatureModder"+randomUUID
final val inputCol: Param[String] = new Param[String](this,
"inputCos", "input column")
final def setInputCol(col:String) = set(inputCol, col)
final val outputCol: Param[String] = new Param[String](this,
"outputCol", "output column")
final def setOutputCol(col:String) = set(outputCol, col)
final val size: Param[String] = new Param[String](this, "size",
"length of output vector")
final def setSize = (n:Int) => set(size, n.toString)
override def transform(data: Dataset[_]) = {
val modUDF = udf({n: Int => n % $(size).toInt})
data.withColumn($(outputCol),
modUDF(col($(inputCol)).cast(IntegerType)))
}
def transformSchema(schema: org.apache.spark.sql.types.StructType):
org.apache.spark.sql.types.StructType = {
val actualType = schema($(inputCol)).dataType
require(actualType.equals(IntegerType) ||
actualType.equals(DoubleType), s"Input column must be of numeric type")
DataTypes.createStructType(schema.fields :+
DataTypes.createStructField($(outputCol), IntegerType, false))
}
override def copy(extra: ParamMap): Transformer = copy(extra)
}
This was included in an ML pipeline, fitted into a model and persisted
to a disk file. When we try to load the pipeline model in a separate
notebook (we use Zeppelin), an exception is thrown complaining class not
fund.
java.lang.ClassNotFoundException: $line103090609224.$read$FeatureModder
at
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at
java.base/java.lang.Class.forName0(Native Method) at
java.base/java.lang.Class.forName(Class.java:398) at
org.apache.spark.util.Utils$.classForName(Utils.scala:207) at
org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:630)
at
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:238) at
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274)
at
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
at scala.util.Try$.apply(Try.scala:213) at
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
at
org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268)
at
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356)
at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160)
at org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155)
at
org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42)
at
org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355)
at
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
at scala.util.Try$.apply(Try.scala:213) at
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
at
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355)
at
org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349)
at org.apache.spark.ml.util.MLReadable.load(ReadWrite.scala:355) at
org.apache.spark.ml.util.MLReadable.load$(ReadWrite.scala:355) at
org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:337) ... 40
elided Could someone help explaining why? My guess was the class
definition is not in the classpath. The question is how to include the
class definition or class metadata as part of the pipeline model
serialization? or include the class definition in a notebook (we did
include the class definition in the notebook that loads the pipeline model)?
Thanks a lot in advance for your help!
ND