We have a feature engineering transformer defined as a custom class with UDF as follows:

class FeatureModder extends Transformer with DefaultParamsWritable with DefaultParamsReadable[FeatureModder] {
    val uid: String = "FeatureModder"+randomUUID

    final val inputCol: Param[String] = new Param[String](this, "inputCos", "input column")
    final def setInputCol(col:String) = set(inputCol, col)

    final val outputCol: Param[String] = new Param[String](this, "outputCol", "output column")
    final def setOutputCol(col:String) = set(outputCol, col)

    final val size: Param[String] = new Param[String](this, "size", "length of output vector")
    final def setSize = (n:Int) => set(size, n.toString)

    override def transform(data: Dataset[_]) = {
        val modUDF = udf({n: Int => n % $(size).toInt})
        data.withColumn($(outputCol), modUDF(col($(inputCol)).cast(IntegerType)))
    }

    def transformSchema(schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.types.StructType = {
        val actualType = schema($(inputCol)).dataType
        require(actualType.equals(IntegerType) || actualType.equals(DoubleType), s"Input column must be of numeric type")         DataTypes.createStructType(schema.fields :+ DataTypes.createStructField($(outputCol), IntegerType, false))
    }

    override def copy(extra: ParamMap): Transformer = copy(extra)
}

This was included in an ML pipeline, fitted into a model and persisted to a disk file.  When we try to load the pipeline model in a separate notebook (we use Zeppelin), an exception is thrown complaining class not fund.

java.lang.ClassNotFoundException: $line103090609224.$read$FeatureModder at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.util.Utils$.classForName(Utils.scala:207) at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstanceReader(ReadWrite.scala:630) at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$4(Pipeline.scala:276) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$load$3(Pipeline.scala:274) at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191) at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:268) at org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$7(Pipeline.scala:356) at org.apache.spark.ml.MLEvents.withLoadInstanceEvent(events.scala:160) at org.apache.spark.ml.MLEvents.withLoadInstanceEvent$(events.scala:155) at org.apache.spark.ml.util.Instrumentation.withLoadInstanceEvent(Instrumentation.scala:42) at org.apache.spark.ml.PipelineModel$PipelineModelReader.$anonfun$load$6(Pipeline.scala:355) at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:355) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:349) at org.apache.spark.ml.util.MLReadable.load(ReadWrite.scala:355) at org.apache.spark.ml.util.MLReadable.load$(ReadWrite.scala:355) at org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:337) ... 40 elided Could someone help explaining why?  My guess was the class definition is not in the classpath.  The question is how to include the class definition or class metadata as part of the pipeline model serialization? or include the class definition in a notebook (we did include the class definition in the notebook that loads the pipeline model)?

Thanks a lot in advance for your help!

ND

Reply via email to