I see what you are saying. Full stack trace:

java.io.IOException: Unable to acquire 4194304 bytes of memory
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
      at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
      at
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
1.org
$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      at org.apache.spark.scheduler.Task.run(Task.scala:88)
      at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
      at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.lang.Thread.run(Thread.java:785)

On 16 September 2015 at 09:30, Reynold Xin <r...@databricks.com> wrote:

> Can you paste the entire stacktrace of the error? In your original email
> you only included the last function call.
>
> Maybe I'm missing something here, but I still think the bad heuristics is
> the issue.
>
> Some operators pre-reserve memory before running anything in order to
> avoid starvation. For example, imagine we have an aggregate followed by a
> sort. If the aggregate is very high cardinality, and uses up all the memory
> and even starts spilling (falling back to sort-based aggregate), there
> isn't memory available at all for the sort operator to use. To work around
> this, each operator reserves a page of memory before they process any data.
>
> Page size is computed by Spark using:
>
> the total amount of execution memory / (maximum number of active tasks *
> 16)
>
> and then rounded to the next power of 2, and cap between 1MB and 64MB.
>
> That is to say, in the worst case, we should be able to reserve at least 8
> pages (16 rounded up to the next power of 2).
>
> However, in your case, the max number of active tasks is 32 (set by test
> env), while the page size is determined using # cores (8 in your case). So
> it is off by a factor of 4. As a result, with this page size, we can only
> reserve at least 2 pages. That is to say, if you have more than 3 operators
> that need page reservation (e.g. an aggregate followed by a join on the
> group by key followed by a shuffle - which seems to be the case of
> join31.q), the task can fail to reserve memory before running anything.
>
>
> There is a 2nd problem (maybe this is the one you were trying to point
> out?) that is tasks running at the same time can be competing for memory
> with each other.  Spark allows each task to claim up to 2/N share of
> memory, where N is the number of active tasks. If a task is launched before
> others and hogs a lot of memory quickly, the other tasks that are launched
> after it might not be able to get enough memory allocation, and thus will
> fail. This is not super ideal, but probably fine because tasks can be
> retried, and can succeed in retries.
>
>
> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbin...@gmail.com> wrote:
>
>> ok so let me try again ;-)
>>
>> I don't think that the page size calculation matters apart from hitting
>> the allocation limit earlier if the page size is too large.
>>
>> If a task is going to need X bytes, it is going to need X bytes. In this
>> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
>> point during execution. A smaller page size may use the memory more
>> efficiently but would not necessarily avoid this issue.
>>
>> The next question would be: Is the memory limit per task of
>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
>> reached currently an exception is thrown, maybe the task could wait for
>> no_active_tasks to decrease?
>>
>> I think what causes my test issue is that the 32 tasks don't execute as
>> quickly on my 8 core box so more are active at any one time.
>>
>> I will experiment with the page size calculation to see what effect it
>> has.
>>
>> Cheers,
>>
>>
>>
>> On 16 September 2015 at 06:53, Reynold Xin <r...@databricks.com> wrote:
>>
>>> It is exactly the issue here, isn't it?
>>>
>>> We are using memory / N, where N should be the maximum number of active
>>> tasks. In the current master, we use the number of cores to approximate the
>>> number of tasks -- but it turned out to be a bad approximation in tests
>>> because it is set to 32 to increase concurrency.
>>>
>>>
>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbin...@gmail.com>
>>> wrote:
>>>
>>>> Oops... I meant to say "The page size calculation is NOT the issue here"
>>>>
>>>> On 16 September 2015 at 06:46, Pete Robbins <robbin...@gmail.com>
>>>> wrote:
>>>>
>>>>> The page size calculation is the issue here as there is plenty of free
>>>>> memory, although there is maybe a fair bit of wasted space in some pages.
>>>>> It is that when we have a lot of tasks each is only allowed to reach 1/n 
>>>>> of
>>>>> the available memory and several of the tasks bump in to that limit. With
>>>>> tasks 4 times the number of cores there will be some contention and so 
>>>>> they
>>>>> remain active for longer.
>>>>>
>>>>> So I think this is a test case issue configuring the number of
>>>>> executors too high.
>>>>>
>>>>> On 15 September 2015 at 18:54, Reynold Xin <r...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbin...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes and at least there is an override by setting
>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 
>>>>>>> core
>>>>>>> box.
>>>>>>>
>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will
>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores.
>>>>>>>
>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <van...@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>>
>>>>>>>> object TestHive
>>>>>>>>   extends TestHiveContext(
>>>>>>>>     new SparkContext(
>>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <r...@databricks.com>
>>>>>>>> wrote:
>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>>>> cores to
>>>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>>>> using 32
>>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>>> yourself? I'm
>>>>>>>> > not sure which one)
>>>>>>>> >
>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>>> robbin...@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> Reynold, thanks for replying.
>>>>>>>> >>
>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>>> >>
>>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>>> >>
>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <r...@databricks.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Pete - can you do me a favor?
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>> >>>
>>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>>> function, and
>>>>>>>> >>> check their values.
>>>>>>>> >>>
>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>>> r...@databricks.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>>> >>>>
>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory
>>>>>>>> for
>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB
>>>>>>>> heap, that's
>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>>>> reserves at
>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it
>>>>>>>> only takes 3
>>>>>>>> >>>> operators to use up its memory.
>>>>>>>> >>>>
>>>>>>>> >>>> The thing is page size is dynamically determined -- and in
>>>>>>>> your case it
>>>>>>>> >>>> should be smaller than 4MB.
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>> >>>>
>>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>>> explicitly set
>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need
>>>>>>>> to find it and
>>>>>>>> >>>> just remove it.
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>>> robbin...@gmail.com>
>>>>>>>> >>>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>>> 3653.0
>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>>> 3653.0 (TID
>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>>> 4194304 bytes of
>>>>>>>> >>>>> memory
>>>>>>>> >>>>>       at
>>>>>>>> >>>>>
>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> This is using the command
>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver  test
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins builds.
>>>>>>>> Do those
>>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>>> missing? My build is
>>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml,
>>>>>>>> eg -Xmx3G.
>>>>>>>> >>>>>
>>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>>> value.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Trying to analyze what is going on with the test it is
>>>>>>>> related to the
>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>>>> most of those
>>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Marcelo
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to