[ https://issues.apache.org/jira/browse/SPARK-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Björn-Elmar Macek updated SPARK-16571: -------------------------------------- Description: When executing the following code, an exception is thrown. {code} val finalProbabilityProxiesDF = sqlc.sql(app2AgeNormalissationQuery(transformedAppsCol, bucketedAgeCol, probProxyCol, normFacCol, ageFeatureRawTable, normFactorsTableName, ageApp2AgeProxyTableName)).repartition(10) //sort the stats val finalFeatMap = finalProbabilityProxiesDF.select(transformedAppsCol, probProxyCol).map{ row => val probs = row.getAs[mutable.WrappedArray[util.ArrayList[Double]]](1).map(array => (array.get(0),array.get(1))).toArray val bucketsExist = probs.map(_._1) val allBuckets = ageCol match { case "label" => (0 to ageSplits.size - 1).map(_.toDouble) case "age" => (1 to ageSplits.size - 2).map(_.toDouble) } val missingBuckets = allBuckets.diff(bucketsExist).map{(_, 0.0)} val fixedProbs = probs ++ missingBuckets val filteredFixedProbs = ageCol match { case "label" => fixedProbs case "age" => fixedProbs.filter(_._1 != 0.0).map(elem => (elem._1 - 1.0, elem._2)) } val sortedProbs = filteredFixedProbs.sortWith( _._1 < _._1 ) (row.getInt(0), sortedProbs) } {code} The stacktrace shows: {code} java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114) at org.apache.spark.sql.catalyst.util.GenericArrayData.getDouble(GenericArrayData.scala:53) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} If i remove the repartition from finalProbabilityProxiesDF the code runs without problems. I am unsure about the reasons tho. This should not happen should it? was: When executing the following code, an exception is thrown. {code} val finalProbabilityProxiesDF = sqlc.sql(app2AgeNormalissationQuery(transformedAppsCol, bucketedAgeCol, probProxyCol, normFacCol, ageFeatureRawTable, normFactorsTableName, ageApp2AgeProxyTableName)).repartition(10) //sort the stats val finalFeatMap = finalProbabilityProxiesDF.select(transformedAppsCol, probProxyCol).map{ row => val probs = row.getAs[mutable.WrappedArray[util.ArrayList[Double]]](1).map(array => (array.get(0),array.get(1))).toArray val bucketsExist = probs.map(_._1) val allBuckets = ageCol match { case "label" => (0 to ageSplits.size - 1).map(_.toDouble) case "age" => (1 to ageSplits.size - 2).map(_.toDouble) } val missingBuckets = allBuckets.diff(bucketsExist).map{(_, 0.0)} val fixedProbs = probs ++ missingBuckets val filteredFixedProbs = ageCol match { case "label" => fixedProbs case "age" => fixedProbs.filter(_._1 != 0.0).map(elem => (elem._1 - 1.0, elem._2)) } val sortedProbs = filteredFixedProbs.sortWith( _._1 < _._1 ) (row.getInt(0), sortedProbs) } {/code} The stacktrace shows: {code} java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Double at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114) at org.apache.spark.sql.catalyst.util.GenericArrayData.getDouble(GenericArrayData.scala:53) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {/code} If i remove the repartition from finalProbabilityProxiesDF the code runs without problems. I am unsure about the reasons tho. This should not happen should it? > DataFrame repartition leads to unexpected error during shuffle > -------------------------------------------------------------- > > Key: SPARK-16571 > URL: https://issues.apache.org/jira/browse/SPARK-16571 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 1.6.1 > Reporter: Björn-Elmar Macek > > When executing the following code, an exception is thrown. > {code} > val finalProbabilityProxiesDF = > sqlc.sql(app2AgeNormalissationQuery(transformedAppsCol, bucketedAgeCol, > probProxyCol, normFacCol, ageFeatureRawTable, normFactorsTableName, > ageApp2AgeProxyTableName)).repartition(10) > //sort the stats > val finalFeatMap = finalProbabilityProxiesDF.select(transformedAppsCol, > probProxyCol).map{ row => > val probs = > row.getAs[mutable.WrappedArray[util.ArrayList[Double]]](1).map(array => > (array.get(0),array.get(1))).toArray > val bucketsExist = probs.map(_._1) > val allBuckets = ageCol match { > case "label" => (0 to ageSplits.size - 1).map(_.toDouble) > case "age" => (1 to ageSplits.size - 2).map(_.toDouble) > } > val missingBuckets = allBuckets.diff(bucketsExist).map{(_, 0.0)} > val fixedProbs = probs ++ missingBuckets > val filteredFixedProbs = ageCol match { > case "label" => fixedProbs > case "age" => fixedProbs.filter(_._1 != 0.0).map(elem => (elem._1 - > 1.0, elem._2)) > } > val sortedProbs = filteredFixedProbs.sortWith( _._1 < _._1 ) > (row.getInt(0), sortedProbs) > } > {code} > The stacktrace shows: > {code} > java.lang.ClassCastException: java.util.ArrayList cannot be cast to > java.lang.Double > at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114) > at > org.apache.spark.sql.catalyst.util.GenericArrayData.getDouble(GenericArrayData.scala:53) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > If i remove the repartition from finalProbabilityProxiesDF the code runs > without problems. > I am unsure about the reasons tho. This should not happen should it? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org