it seems HadoopFsRelation keeps track of all part files (instead of just
the data directories). i believe this has something to do with parquet
footers but i didnt bother to look more into it. but yet the result is that
driver side it:
1) tries to keep track of all part files in a Map[Path, FileStatus]
2) it also tries to serialize the paths to all part files (instead of just
the data directories) in the Hadoop JobConf object (or create a JobConf per
part file in case of spark-avro)

i agree this approach is not scalable... i ran into it myself with
spark-avro where a job simply never gets started on a large number of part
files. i am still trying to understand better why all the part files need
to be tracked driver side but i am pretty sure i plan to remove this in our
inhouse spark version.

i also noticed code that actually assumes the schema for every part file
can be different (even within the same partition, which seems unlikely,
except if you use insert i guess), and the code tries to reconcile the
schema between all part files... i also do not think this is scalable.

sorry this became a bit of a rant


On Mon, Oct 26, 2015 at 9:56 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Fengdong,
>
> Why it needs more memory at the driver side when there are many
> partitions? It seems the implementation can only support use cases for a
> dozen of partition when it is over 100, it fails apart. It is also quite
> slow to initialize the loading of partition tables when the number of
> partition is over 100.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fengdo...@everstring.com> wrote:
>
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
>
>
>
>
>
> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chiling...@gmail.com> wrote:
>
> Hi guys,
>
> I mentioned that the partitions are generated so I tried to read the
> partition data from it. The driver is OOM after few minutes. The stack
> trace is below. It looks very similar to the the jstack above (note on the
> refresh method). Thanks!
>
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org
>  
> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org
>  
> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org
>  
> <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org
>  
> <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>
>
> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Josh,
>>
>> No I don't have speculation enabled. The driver took about few hours
>> until it was OOM. Interestingly, all partitions are generated successfully
>> (_SUCCESS file is written in the output directory). Is there a reason why
>> the driver needs so much memory? The jstack revealed that it called refresh
>> some file statuses. Is there a way to avoid OutputCommitCoordinator to
>> use so much memory?
>>
>> Ultimately, I choose to use partitions because most of the queries I have
>> will execute based the partition field. For example, "SELECT events from
>> customer where customer_id = 1234". If the partition is based on
>> customer_id, all events for a customer can be easily retrieved without
>> filtering the entire dataset which is much more efficient (I hope).
>> However, I notice that the implementation of the partition logic does not
>> seem to allow this type of use cases without using a lot of memory which is
>> a bit odd in my opinion. Any help will be greatly appreciated.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:
>>
>>> Hi Jerry,
>>>
>>> Do you have speculation enabled? A write which produces one million
>>> files / output partitions might be using tons of driver memory via the
>>> OutputCommitCoordinator's bookkeeping data structures.
>>>
>>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi spark guys,
>>>>
>>>> I think I hit the same issue SPARK-8890
>>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>>> resolved. However it is not. I have over a million output directories for 1
>>>> single column in partitionBy. Not sure if this is a regression issue? Do I
>>>> need to set some parameters to make it more memory efficient?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>>>> The method was called when saving the dataframe in parquet format. Also 
>>>>> I'm
>>>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>>>> the output folder.
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Spark users and developers,
>>>>>>
>>>>>> Does anyone encounter any issue when a spark SQL job produces a lot
>>>>>> of files (over 1 millions), the job hangs on the refresh method? I'm 
>>>>>> using
>>>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are 
>>>>>> produced
>>>>>> but the driver is doing something very intensively (it uses all the 
>>>>>> cpus).
>>>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in 
>>>>>> a
>>>>>> single job?
>>>>>>
>>>>>> Thread 528: (state = BLOCKED)
>>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>>>> frame)
>>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>>>> line=130 (Compiled frame)
>>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int)
>>>>>> @bci=12, line=114 (Compiled frame)
>>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>>>> line=415 (Compiled frame)
>>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>>>> (Compiled frame)
>>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>>>> frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>>>> @bci=4, line=447 (Compiled frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>>>> @bci=5, line=447 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>>> @bci=9, line=244 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>>> @bci=2, line=244 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>>> @bci=2, line=108 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>>>> (Compiled frame)
>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted 
>>>>>> frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>>>> @bci=279, line=447 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>>>> @bci=8, line=453 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org
>>>>>> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>>>> @bci=26, line=465 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org
>>>>>> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>>>> @bci=12, line=463 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>>>> line=540 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>>>> @bci=1, line=204 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>>>> @bci=392, line=152 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>>>> line=56 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>>>> @bci=718, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>>>> @bci=20, line=57 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>>>> @bci=15, line=57 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute()
>>>>>> @bci=12, line=69 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>>> @bci=11, line=140 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() 
>>>>>> @bci=1,
>>>>>> line=138 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>>>> (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>>>> line=138 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>>>> @bci=21, line=933 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>>>> line=933 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>>>> line=197 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>>>> (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String)
>>>>>> @bci=24, line=137 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>>>> @bci=8, line=304 (Interpreted frame)
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Jerry
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to