Dear all,

Here is an example of code to reproduce the issue I mentioned in a previous
mail about saving an UserDefinedType into a parquet file. The problem here
is that the code works when I run it inside intellij idea but fails when I
create the assembly jar and run it with spark-submit. I use the master
version of  Spark.






































*@SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])class
MyDenseVector(val data: Array[Double]) extends Serializable {
override def equals(other: Any): Boolean = other match {    case v:
MyDenseVector =>      java.util.Arrays.equals(this.data, v.data)
case _ => false  }}class MyDenseVectorUDT extends
UserDefinedType[MyDenseVector] {  override def sqlType: DataType =
ArrayType(DoubleType, containsNull = false)  override def
serialize(obj: Any): Seq[Double] = {    obj match {      case
features: MyDenseVector =>        features.data.toSeq    }  }
override def deserialize(datum: Any): MyDenseVector = {    datum match
{      case data: Seq[_] =>        new
MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)    }  }
override def userClass: Class[MyDenseVector] =
classOf[MyDenseVector]}case class Toto(imageAnnotation:
MyDenseVector)object TestUserDefinedType {  case class Params(input:
String = null,                   partitions: Int = 12,
   outputDir: String = "images.parquet")  def main(args:
Array[String]): Unit = {*

*    val conf = new
SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")*






*    val sc = new SparkContext(conf)    val sqlContext = new
SQLContext(sc)    import sqlContext.implicits._    val rawImages =
sc.parallelize((1 to 5)**.map(x => Toto(new
MyDenseVector(Array[Double](x.toDouble))))).toDF*

*    rawImages.printSchema()*

*    rawImages.show()*


*    rawImages.save("toto.parquet") // This fails with assembly jar
sc.stop()*


* }}*


My build.sbt is as follow :


















*libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" %
sparkVersion % "provided",  "org.apache.spark" %% "spark-sql" %
sparkVersion,  "org.apache.spark" %% "spark-mllib" %
sparkVersion)assemblyMergeStrategy in assembly := {  case
PathList("javax", "servlet", xs @ _*) => MergeStrategy.first  case
PathList("org", "apache", xs @ _*) => MergeStrategy.first  case
PathList("org", "jboss", xs @ _*) => MergeStrategy.first//  case
PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first//
 case "application.conf"                            =>
MergeStrategy.concat  case m if m.startsWith("META-INF") =>
MergeStrategy.discard  //case x =>  //  val oldStrategy =
(assemblyMergeStrategy in assembly).value  //  oldStrategy(x)  case _
=> MergeStrategy.first}*


As I said, this code works without problem when I execute it inside
intellij idea. But when generate the assembly jar with sbt-assembly
and

use spark-submit I got the following error :

*15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7)
java.lang.IllegalArgumentException: Unsupported dataType:
{"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]},
[1.1] failure: `TimestampType' expected but `{' found

{"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double","containsNull":false}},"nullable":true,"metadata":{}}]}
^
        at 
org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
        at 
org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)
        at scala.util.Try.getOrElse(Try.scala:77)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)
        at 
org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
        at org.apache.spark.sql.parquet.ParquetRelation2.org
<http://org.apache.spark.sql.parquet.ParquetRelation2.org>$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:694)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:716)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)*


The issue seems to be related to the generation of the assembly jar
but I just can't figure out how to fix it. Any ideas will be helpfull.

Best regards,


Jao

Reply via email to