Hi Cheng, Yes, some rows contain unit instead of decimal values. I believe some rows from original source I had don't have any value i.e. it is null. And that shows up as unit. How does the spark-sql or parquet handle null in place of decimal values, assuming that field is nullable. I will have to change it properly.
Thanks for helping out. Bipin On 12 June 2015 at 14:57, Cheng Lian <lian.cs....@gmail.com> wrote: > On 6/10/15 8:53 PM, Bipin Nag wrote: > > Hi Cheng, > > I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an > existing parquet file, then repartitioning and saving it. Doing this gives > the error. The code for this doesn't look like causing problem. I have a > feeling the source - the existing parquet is the culprit. > > I created that parquet using a jdbcrdd (pulled from microsoft sql server). > First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made > a dataframe from it using a schema then saved it as a parquet. > > Following is the code : > For saving jdbcrdd: > name - fullqualifiedtablename > pk - string for primarykey > pklast - last id to pull > val myRDD = new JdbcRDD( sc, () => > DriverManager.getConnection(url,username,password) , > "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and > "+pk+" <= ?", > 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) > myRDD.saveAsObjectFile("rawdata/"+name); > > For applying schema and saving the parquet: > val myschema = schemamap(name) > val myrdd = > sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x => > org.apache.spark.sql.Row(x:_*)) > > Have you tried to print out x here to check its contents? My guess is > that x actually contains unit values. For example, the follow Spark shell > code can reproduce a similar exception: > > import org.apache.spark.sql.types._import org.apache.spark.sql.Row > val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil)val rdd > = sc.parallelize(1 to 10).map(_ => Array(())).map(arr => Row(arr: _*))val df > = sqlContext.createDataFrame(rdd, schema) > > df.saveAsParquetFile("file:///tmp/foo") > > val actualdata = sqlContext.createDataFrame(myrdd, myschema) > actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name) > > Schema structtype can be made manually, though I pull table's metadata > and make one. It is a simple string translation (see sql docs > <https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx> > and/or spark datatypes > <https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types> > ) > > That is how I created the parquet file. Any help to solve the issue is > appreciated. > Thanks > Bipin > > > On 9 June 2015 at 20:44, Cheng Lian <lian.cs....@gmail.com> wrote: > >> Would you please provide a snippet that reproduce this issue? What >> version of Spark were you using? >> >> Cheng >> >> On 6/9/15 8:18 PM, bipin wrote: >> >>> Hi, >>> When I try to save my data frame as a parquet file I get the following >>> error: >>> >>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to >>> org.apache.spark.sql.types.Decimal >>> at >>> >>> org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) >>> at >>> >>> org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) >>> at >>> >>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) >>> at >>> >>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) >>> at >>> >>> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) >>> at >>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) >>> at >>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) >>> at >>> org.apache.spark.sql.parquet.ParquetRelation2.org >>> $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671) >>> at >>> >>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) >>> at >>> >>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) >>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> How to fix this problem ? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >> > >