This is a very interesting issue, the root reason for the lower performance probably is, in Scala UDF, Spark SQL converts the data type from internal representation to Scala representation via Scala reflection recursively.
Can you create a Jira issue for tracking this? I can start to work on the improvement soon. From: zzcclp [mailto:441586...@qq.com] Sent: Monday, March 23, 2015 5:10 PM To: user@spark.apache.org Subject: Spark SQL udf(ScalaUdf) is very slow My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G parquet files from hdfs Test case: 1. register "floor" func with command: sqlContext.udf.register("floor", (ts: Int) => ts - ts % 300), then run with sql "select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, floor(ts)", it takes 17 minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54) Aggregate true, [chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at newParquet.scala:562 2. run with sql "select chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts - ts % 300)", it takes only 5 minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], [chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], [chan#23015,(ts#23016 - (ts#23016 % 300)) AS PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at newParquet.scala:562 3. use HiveContext with sql "select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300))" it takes only 5 minutes too. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange (HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300)))], [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? How to improve it? ________________________________ View this message in context: Spark SQL udf(ScalaUdf) is very slow<http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.