Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark.
*@SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector => java.util.Arrays.equals(this.data, v.data) case _ => false }}class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector => features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] => new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]}case class Toto(imageAnnotation: MyDenseVector)object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = "images.parquet") def main(args: Array[String]): Unit = {* * val conf = new SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")* * val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5)**.map(x => Toto(new MyDenseVector(Array[Double](x.toDouble))))).toDF* * rawImages.printSchema()* * rawImages.show()* * rawImages.save("toto.parquet") // This fails with assembly jar sc.stop()* * }}* My build.sbt is as follow : *libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-mllib" % sparkVersion)assemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("org", "jboss", xs @ _*) => MergeStrategy.first// case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first// case "application.conf" => MergeStrategy.concat case m if m.startsWith("META-INF") => MergeStrategy.discard //case x => // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ => MergeStrategy.first}* As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : *15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}, [1.1] failure: `TimestampType' expected but `{' found {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetRelation2.org <http://org.apache.spark.sql.parquet.ParquetRelation2.org>$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)* The issue seems to be related to the generation of the assembly jar but I just can't figure out how to fix it. Any ideas will be helpfull. Best regards, Jao