Hi Josh,

No I don't have speculation enabled. The driver took about few hours until
it was OOM. Interestingly, all partitions are generated successfully
(_SUCCESS file is written in the output directory). Is there a reason why
the driver needs so much memory? The jstack revealed that it called refresh
some file statuses. Is there a way to avoid OutputCommitCoordinator to use
so much memory?

Ultimately, I choose to use partitions because most of the queries I have
will execute based the partition field. For example, "SELECT events from
customer where customer_id = 1234". If the partition is based on
customer_id, all events for a customer can be easily retrieved without
filtering the entire dataset which is much more efficient (I hope).
However, I notice that the implementation of the partition logic does not
seem to allow this type of use cases without using a lot of memory which is
a bit odd in my opinion. Any help will be greatly appreciated.

Best Regards,

Jerry



On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:

> Hi Jerry,
>
> Do you have speculation enabled? A write which produces one million files
> / output partitions might be using tons of driver memory via the
> OutputCommitCoordinator's bookkeeping data structures.
>
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark guys,
>>
>> I think I hit the same issue SPARK-8890
>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>> resolved. However it is not. I have over a million output directories for 1
>> single column in partitionBy. Not sure if this is a regression issue? Do I
>> need to set some parameters to make it more memory efficient?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>>
>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>> The method was called when saving the dataframe in parquet format. Also I'm
>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>> the output folder.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Spark users and developers,
>>>>
>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>> single job?
>>>>
>>>> Thread 528: (state = BLOCKED)
>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>> frame)
>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>> line=130 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>>>> line=114 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>> line=415 (Compiled frame)
>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>> (Compiled frame)
>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>> frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>> @bci=4, line=447 (Compiled frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>> @bci=5, line=447 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=9, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=2, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>> @bci=2, line=108 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>> (Compiled frame)
>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>> @bci=279, line=447 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>> @bci=8, line=453 (Interpreted frame)
>>>>  - 
>>>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>> @bci=26, line=465 (Interpreted frame)
>>>>  - 
>>>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>> @bci=12, line=463 (Interpreted frame)
>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>> line=540 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>> @bci=1, line=204 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>> @bci=392, line=152 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>> @bci=1, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>> @bci=1, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>> line=56 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>> @bci=718, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>> @bci=20, line=57 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>> @bci=15, line=57 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>>>> line=69 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>> @bci=11, line=140 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>> @bci=1, line=138 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>> (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>> line=138 (Interpreted frame)
>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>> @bci=21, line=933 (Interpreted frame)
>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>> line=933 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>> line=197 (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>> (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
>>>> line=137 (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>> @bci=8, line=304 (Interpreted frame)
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to