How to reflect dynamic registration udf? java.lang.UnsupportedOperationException: Schema for type _$13 is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145) at com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115) at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759) at io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446) at io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368) at io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
final class sparkFunc(val name: String) extends StaticAnnotation{} def registerFunc(hiveContext: HiveContext): Unit = { info("register udf function") val ru = scala.reflect.runtime.universe val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader) new FastClasspathScanner("com.alibaba.spark.odps.driver.functions") .matchAllClasses(new ClassMatchProcessor() { override def processMatch(aClass: Class[_]): Unit = { val classMirror = classLoaderMirror.classSymbol(aClass) val annotation = classMirror.annotations.find(_.tpe =:= ru.typeOf[sparkFunc]).getOrElse(null) try { if (annotation != null) { var funcName = StringUtils.substringBetween(annotation.toString, "\"", "\"") if (chekClazz(aClass, classOf[Function1[_, _]])) { val func: Function1[_, _] = createInstance[Function1[_, _]](aClass).get hiveContext.udf.register(funcName, func) } else if (chekClazz(aClass, classOf[Function2[_, _, _]])) { val func: Function2[_, _, _] = createInstance[Function2[_, _, _]](aClass).get hiveContext.udf.register(funcName, func) } else if (chekClazz(aClass, classOf[Function3[_, _, _, _]])) { val func: Function3[_, _, _, _] = createInstance[Function3[_, _, _, _]](aClass).get hiveContext.udf.register(funcName, func) } else { throw new RuntimeException("not support function") } info("== register function: {}", funcName) } } catch { case e: Exception => error(e.getMessage, e) } } }).scan() } private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = { try { sClass.asSubclass(pClass) true } catch { case e: Exception => false } } private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = { Try { val constructor = clazz.getDeclaredConstructor() constructor.setAccessible(true) val obj = constructor.newInstance() val t = implicitly[ClassTag[T]].runtimeClass if (t.isInstance(obj)) { obj.asInstanceOf[T] } else throw new ClassCastException(clazz.getName + " is not a subtype of " + t) } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } }