I see what you are saying. Full stack trace: java.io.IOException: Unable to acquire 4194304 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$ 1.org $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.lang.Thread.run(Thread.java:785)
On 16 September 2015 at 09:30, Reynold Xin <r...@databricks.com> wrote: > Can you paste the entire stacktrace of the error? In your original email > you only included the last function call. > > Maybe I'm missing something here, but I still think the bad heuristics is > the issue. > > Some operators pre-reserve memory before running anything in order to > avoid starvation. For example, imagine we have an aggregate followed by a > sort. If the aggregate is very high cardinality, and uses up all the memory > and even starts spilling (falling back to sort-based aggregate), there > isn't memory available at all for the sort operator to use. To work around > this, each operator reserves a page of memory before they process any data. > > Page size is computed by Spark using: > > the total amount of execution memory / (maximum number of active tasks * > 16) > > and then rounded to the next power of 2, and cap between 1MB and 64MB. > > That is to say, in the worst case, we should be able to reserve at least 8 > pages (16 rounded up to the next power of 2). > > However, in your case, the max number of active tasks is 32 (set by test > env), while the page size is determined using # cores (8 in your case). So > it is off by a factor of 4. As a result, with this page size, we can only > reserve at least 2 pages. That is to say, if you have more than 3 operators > that need page reservation (e.g. an aggregate followed by a join on the > group by key followed by a shuffle - which seems to be the case of > join31.q), the task can fail to reserve memory before running anything. > > > There is a 2nd problem (maybe this is the one you were trying to point > out?) that is tasks running at the same time can be competing for memory > with each other. Spark allows each task to claim up to 2/N share of > memory, where N is the number of active tasks. If a task is launched before > others and hogs a lot of memory quickly, the other tasks that are launched > after it might not be able to get enough memory allocation, and thus will > fail. This is not super ideal, but probably fine because tasks can be > retried, and can succeed in retries. > > > On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbin...@gmail.com> wrote: > >> ok so let me try again ;-) >> >> I don't think that the page size calculation matters apart from hitting >> the allocation limit earlier if the page size is too large. >> >> If a task is going to need X bytes, it is going to need X bytes. In this >> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some >> point during execution. A smaller page size may use the memory more >> efficiently but would not necessarily avoid this issue. >> >> The next question would be: Is the memory limit per task of >> max_memory/no_active_tasks reasonable? It seems fair but if this limit is >> reached currently an exception is thrown, maybe the task could wait for >> no_active_tasks to decrease? >> >> I think what causes my test issue is that the 32 tasks don't execute as >> quickly on my 8 core box so more are active at any one time. >> >> I will experiment with the page size calculation to see what effect it >> has. >> >> Cheers, >> >> >> >> On 16 September 2015 at 06:53, Reynold Xin <r...@databricks.com> wrote: >> >>> It is exactly the issue here, isn't it? >>> >>> We are using memory / N, where N should be the maximum number of active >>> tasks. In the current master, we use the number of cores to approximate the >>> number of tasks -- but it turned out to be a bad approximation in tests >>> because it is set to 32 to increase concurrency. >>> >>> >>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbin...@gmail.com> >>> wrote: >>> >>>> Oops... I meant to say "The page size calculation is NOT the issue here" >>>> >>>> On 16 September 2015 at 06:46, Pete Robbins <robbin...@gmail.com> >>>> wrote: >>>> >>>>> The page size calculation is the issue here as there is plenty of free >>>>> memory, although there is maybe a fair bit of wasted space in some pages. >>>>> It is that when we have a lot of tasks each is only allowed to reach 1/n >>>>> of >>>>> the available memory and several of the tasks bump in to that limit. With >>>>> tasks 4 times the number of cores there will be some contention and so >>>>> they >>>>> remain active for longer. >>>>> >>>>> So I think this is a test case issue configuring the number of >>>>> executors too high. >>>>> >>>>> On 15 September 2015 at 18:54, Reynold Xin <r...@databricks.com> >>>>> wrote: >>>>> >>>>>> Maybe we can change the heuristics in memory calculation to use >>>>>> SparkContext.defaultParallelism if it is local mode. >>>>>> >>>>>> >>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbin...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Yes and at least there is an override by setting >>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 >>>>>>> core >>>>>>> box. >>>>>>> >>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will >>>>>>> fail running build/tests on a clean checkout if you only have 8 cores. >>>>>>> >>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <van...@cloudera.com> >>>>>>> wrote: >>>>>>> >>>>>>>> That test explicitly sets the number of executor cores to 32. >>>>>>>> >>>>>>>> object TestHive >>>>>>>> extends TestHiveContext( >>>>>>>> new SparkContext( >>>>>>>> System.getProperty("spark.sql.test.master", "local[32]"), >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <r...@databricks.com> >>>>>>>> wrote: >>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8 >>>>>>>> cores to >>>>>>>> > approximate the number of active tasks, but the tests somehow is >>>>>>>> using 32 >>>>>>>> > (maybe because it explicitly sets it to that, or you set it >>>>>>>> yourself? I'm >>>>>>>> > not sure which one) >>>>>>>> > >>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins < >>>>>>>> robbin...@gmail.com> wrote: >>>>>>>> >> >>>>>>>> >> Reynold, thanks for replying. >>>>>>>> >> >>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0 >>>>>>>> >> Calculated values: cores=8, default=4194304 >>>>>>>> >> >>>>>>>> >> So am I getting a large page size as I only have 8 cores? >>>>>>>> >> >>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <r...@databricks.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> Pete - can you do me a favor? >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>>>>>>> >>> >>>>>>>> >>> Print the parameters that are passed into the getPageSize >>>>>>>> function, and >>>>>>>> >>> check their values. >>>>>>>> >>> >>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin < >>>>>>>> r...@databricks.com> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Is this on latest master / branch-1.5? >>>>>>>> >>>> >>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory >>>>>>>> for >>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB >>>>>>>> heap, that's >>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator >>>>>>>> reserves at >>>>>>>> >>>> least one page for execution. If your page size is 4MB, it >>>>>>>> only takes 3 >>>>>>>> >>>> operators to use up its memory. >>>>>>>> >>>> >>>>>>>> >>>> The thing is page size is dynamically determined -- and in >>>>>>>> your case it >>>>>>>> >>>> should be smaller than 4MB. >>>>>>>> >>>> >>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>>>>>>> >>>> >>>>>>>> >>>> Maybe there is a place that in the maven tests that we >>>>>>>> explicitly set >>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need >>>>>>>> to find it and >>>>>>>> >>>> just remove it. >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins < >>>>>>>> robbin...@gmail.com> >>>>>>>> >>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> - join31 *** FAILED *** >>>>>>>> >>>>> Failed to execute query using catalyst: >>>>>>>> >>>>> Error: Job aborted due to stage failure: Task 9 in stage >>>>>>>> 3653.0 >>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage >>>>>>>> 3653.0 (TID >>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire >>>>>>>> 4194304 bytes of >>>>>>>> >>>>> memory >>>>>>>> >>>>> at >>>>>>>> >>>>> >>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> This is using the command >>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds. >>>>>>>> Do those >>>>>>>> >>>>> builds have any configuration/environment that I may be >>>>>>>> missing? My build is >>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml, >>>>>>>> eg -Xmx3G. >>>>>>>> >>>>> >>>>>>>> >>>>> I can make these tests pass by setting >>>>>>>> spark.shuffle.memoryFraction=0.6 >>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 >>>>>>>> value. >>>>>>>> >>>>> >>>>>>>> >>>>> Trying to analyze what is going on with the test it is >>>>>>>> related to the >>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the >>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though >>>>>>>> most of those >>>>>>>> >>>>> tasks do not have any memory allocated to them. >>>>>>>> >>>>> >>>>>>>> >>>>> Has anyone seen issues like this before? >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>> >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Marcelo >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >