How many partitions you generated? if Millions generated, then there is a huge memory consumed.
> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chiling...@gmail.com> wrote: > > Hi guys, > > I mentioned that the partitions are generated so I tried to read the > partition data from it. The driver is OOM after few minutes. The stack trace > is below. It looks very similar to the the jstack above (note on the refresh > method). Thanks! > > Name: java.lang.OutOfMemoryError > Message: GC overhead limit exceeded > StackTrace: java.util.Arrays.copyOf(Arrays.java:2367) > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) > java.lang.StringBuilder.append(StringBuilder.java:132) > org.apache.hadoop.fs.Path.toString(Path.java:384) > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447) > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447) > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447) > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453) > org.apache.spark.sql.sources.HadoopFsRelation.org > <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465) > org.apache.spark.sql.sources.HadoopFsRelation.org > <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463) > org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org > <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org > <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196) > scala.Option.getOrElse(Option.scala:120) > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196) > org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561) > org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560) > org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31) > org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395) > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267) > > On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com > <mailto:chiling...@gmail.com>> wrote: > Hi Josh, > > No I don't have speculation enabled. The driver took about few hours until it > was OOM. Interestingly, all partitions are generated successfully (_SUCCESS > file is written in the output directory). Is there a reason why the driver > needs so much memory? The jstack revealed that it called refresh some file > statuses. Is there a way to avoid OutputCommitCoordinator to use so much > memory? > > Ultimately, I choose to use partitions because most of the queries I have > will execute based the partition field. For example, "SELECT events from > customer where customer_id = 1234". If the partition is based on customer_id, > all events for a customer can be easily retrieved without filtering the > entire dataset which is much more efficient (I hope). However, I notice that > the implementation of the partition logic does not seem to allow this type of > use cases without using a lot of memory which is a bit odd in my opinion. Any > help will be greatly appreciated. > > Best Regards, > > Jerry > > > > On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com > <mailto:rosenvi...@gmail.com>> wrote: > Hi Jerry, > > Do you have speculation enabled? A write which produces one million files / > output partitions might be using tons of driver memory via the > OutputCommitCoordinator's bookkeeping data structures. > > On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com > <mailto:chiling...@gmail.com>> wrote: > Hi spark guys, > > I think I hit the same issue SPARK-8890 > https://issues.apache.org/jira/browse/SPARK-8890 > <https://issues.apache.org/jira/browse/SPARK-8890>. It is marked as resolved. > However it is not. I have over a million output directories for 1 single > column in partitionBy. Not sure if this is a regression issue? Do I need to > set some parameters to make it more memory efficient? > > Best Regards, > > Jerry > > > > > On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com > <mailto:chiling...@gmail.com>> wrote: > Hi guys, > > After waiting for a day, it actually causes OOM on the spark driver. I > configure the driver to have 6GB. Note that I didn't call refresh myself. The > method was called when saving the dataframe in parquet format. Also I'm using > partitionBy() on the DataFrameWriter to generate over 1 million files. Not > sure why it OOM the driver after the job is marked _SUCCESS in the output > folder. > > Best Regards, > > Jerry > > > On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com > <mailto:chiling...@gmail.com>> wrote: > Hi Spark users and developers, > > Does anyone encounter any issue when a spark SQL job produces a lot of files > (over 1 millions), the job hangs on the refresh method? I'm using spark > 1.5.1. Below is the stack trace. I saw the parquet files are produced but the > driver is doing something very intensively (it uses all the cpus). Does it > mean Spark SQL cannot be used to produce over 1 million files in a single job? > > Thread 528: (state = BLOCKED) > - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame) > - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 > (Compiled frame) > - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, > line=114 (Compiled frame) > - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 > (Compiled frame) > - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 > (Compiled frame) > - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame) > - > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus) > @bci=4, line=447 (Compiled frame) > - > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object) > @bci=5, line=447 (Compiled frame) > - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) > @bci=9, line=244 (Compiled frame) > - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) > @bci=2, line=244 (Compiled frame) > - > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > scala.Function1) @bci=22, line=33 (Compiled frame) > - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, > line=108 (Compiled frame) > - > scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, > scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244 > (Compiled frame) > - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, > scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame) > - > org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[]) > @bci=279, line=447 (Interpreted frame) > - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() > @bci=8, line=453 (Interpreted frame) > - org.apache.spark.sql.sources.HadoopFsRelation.org > <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute() > @bci=26, line=465 (Interpreted frame) > - org.apache.spark.sql.sources.HadoopFsRelation.org > <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache() > @bci=12, line=463 (Interpreted frame) > - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 > (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() > @bci=1, line=204 (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp() > @bci=392, line=152 (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() > @bci=1, line=108 (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() > @bci=1, line=108 (Interpreted frame) > - > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext, > org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, > line=56 (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext) > @bci=718, line=108 (Interpreted frame) > - > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() > @bci=20, line=57 (Interpreted frame) > - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, > line=57 (Interpreted frame) > - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, > line=69 (Interpreted frame) > - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() > @bci=11, line=140 (Interpreted frame) > - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() > @bci=1, line=138 (Interpreted frame) > - > org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, > java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147 > (Interpreted frame) > - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 > (Interpreted frame) > - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, > line=933 (Interpreted frame) > - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 > (Interpreted frame) > - > org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext, > java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, > scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293, > line=197 (Interpreted frame) > - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted > frame) > - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, > line=137 (Interpreted frame) > - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, > line=304 (Interpreted frame) > > Best Regards, > > Jerry > > > > > > >