I have two separate but similar issues that I've narrowed down to a pretty good level of detail. I'm using Spark 1.6.3, particularly Spark SQL.
I'm concerned with a single dataset for now, although the details apply to other, larger datasets. I'll call it "table". It's around 160 M records, average of 78 bytes each, so about 12 GB uncompressed. It's 2 GB compressed in HDFS. First issue: The following query works if "table" is comprised of 200 partitions (on disk), but fails when "table" is 1200 partitions with the "Total size of serialized results of 1031 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)" error: SELECT * FROM orc.`table` ORDER BY field DESC LIMIT 100000; This is possibly related to the TakeOrderedAndProject step in the execution plan, because the following queries do not give me problems: SELECT * FROM orc.`table`; SELECT * FROM orc.`table` ORDER BY field DESC; SELECT * FROM orc.`table` LIMIT 100000; All of which have different execution plans. My "table" has 1200 partitions because I must use a large value for spark.sql.shuffle.partitions to handle joins and window functions on much larger DataFrames in my application. Too many partitions may be suboptimal, but it shouldn't lead to large serialized results, correct? Any ideas? I've seen https://issues.apache.org/jira/browse/SPARK-12837, but I think my issue is a bit more specific. Second issue: The difference between execution when calling .cache() and .count() on the following two DataFrames: A: sqlContext.sql("SELECT * FROM table") B: sqlContext.sql("SELECT * FROM table ORDER BY field DESC") Counting the rows of A works as expected. A single Spark job with 2 stages. Load from Hadoop, map, aggregate, reduce to a number. The same can't be said for B, however. The .cache() call spawns a Spark job before I even call .count(), loading from HDFS and performing ConvertToSafe and Exchange. The .count() call spawns another job, the first task of which appears to re-load from HDFS and again perform ConvertToSafe and Exchange, writing 1200 shuffle partitions. The next stage then proceeds to read the shuffle data across only 2 tasks. One of these tasks completes immediately and the other runs indefinitely, failing because the partition is too large (the java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE error). Does this behavior make sense at all? Obviously it doesn't make sense to sort rows if I'm just counting them, but this is a simplified example of a more complex application in which caching makes sense. My executors have more than enough memory to cache this entire DataFrame. Thanks for reading --- Joe Naegele Grier Forensics --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org