How to reflect dynamic registration udf?

java.lang.UnsupportedOperationException: Schema for type _$13 is not
supported
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
at
com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

final class sparkFunc(val name: String) extends StaticAnnotation{}

def registerFunc(hiveContext: HiveContext): Unit = {
    info("register udf function")

    val ru = scala.reflect.runtime.universe
    val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)

    new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
        .matchAllClasses(new ClassMatchProcessor() {
            override def processMatch(aClass: Class[_]): Unit = {
                val classMirror = classLoaderMirror.classSymbol(aClass)
                val annotation = classMirror.annotations.find(_.tpe
=:= ru.typeOf[sparkFunc]).getOrElse(null)

                try {
                    if (annotation != null) {
                        var funcName =
StringUtils.substringBetween(annotation.toString, "\"", "\"")

                        if (chekClazz(aClass, classOf[Function1[_, _]])) {
                            val func: Function1[_, _] =
createInstance[Function1[_, _]](aClass).get
                            hiveContext.udf.register(funcName, func)
                        } else if (chekClazz(aClass,
classOf[Function2[_, _, _]])) {
                            val func: Function2[_, _, _] =
createInstance[Function2[_, _, _]](aClass).get
                            hiveContext.udf.register(funcName, func)
                        } else if (chekClazz(aClass,
classOf[Function3[_, _, _, _]])) {
                            val func: Function3[_, _, _, _] =
createInstance[Function3[_, _, _, _]](aClass).get
                            hiveContext.udf.register(funcName, func)
                        } else {
                            throw new RuntimeException("not support function")
                        }

                        info("== register function: {}", funcName)
                    }
                } catch {
                    case e: Exception => error(e.getMessage, e)
                }
            }
        }).scan()
}

private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
    try {
        sClass.asSubclass(pClass)
        true
    } catch {
        case e: Exception => false
    }
}

private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
    Try {
        val constructor = clazz.getDeclaredConstructor()
        constructor.setAccessible(true)
        val obj = constructor.newInstance()
        val t = implicitly[ClassTag[T]].runtimeClass
        if (t.isInstance(obj)) {
            obj.asInstanceOf[T]
        } else throw new ClassCastException(clazz.getName + " is not a
subtype of " + t)
    } recover {
        case i: InvocationTargetException if i.getTargetException ne
null ⇒ throw i.getTargetException
    }
}

Reply via email to