Recently we are changing some data warehouse tables from textfile to orc
format. Some of our hive SQL which read these orc tables failed at reduce
stage. Reducers failed while copying Map outputs with following exception:

Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
>         at
> org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
>         at
> org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
>         at
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
>         at
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
>         at
> org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
>         at
> org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
>         at
> org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)



The query strings of affected SQL are in the below form:

set hive.exec.dynamic.partition.mode=nonstrict;
> set hive.exec.dynamic.partition=true;
> set hive.exec.max.dynamic.partitions=10000;
> insert overwrite table xxx partition(`day`)
> select
>     count(distinct col1) c1,
>     count(distinct col2) c2,
>     ...
>     count(distinct col11) col11
> from t
> group by col12, col13



Here we consider one specific job with 24G totalInputFileSize (orc
compressed),  it launches 97 maps (mapred.max.split.size is 256M) and 30
reduces(hive.exec.reducers.bytes.per.reducer = 1G).

Since there are so many distinct, the total reduce shuffle bytes increase
to 59G (lzo compressed, around 550G after decompressed). The average map
output bytes each reducer fetch will be 550 * 1024 / 97 / 30 = 193M. Here I
notice two default params which control the memory usage of shuffling
process:

mapreduce.reduce.shuffle.input.buffer.percent  = 0.9
> mapreduce.reduce.shuffle.memory.limit.percent  = 0.25



the memoryLimit and maxSingleShuffleLimit is as below:

> memoryLimit = total_memory * $mapreduce.reduce.shuffle.input.buffer.percent
>

maxSingleShuffleLimit = memoryLimit *
> $mapreduce.reduce.shuffle.memory.limit.percent


Here maxSingleShuffleLimit is the threshold for shuffling map output to
memory.

>From the log we can find all the runtime params:

2014-05-04 16:39:27,129 INFO [main]
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl: MergerManager:
> memoryLimit=1336252800, maxSingleShuffleLimit=334063200,
> mergeThreshold=881926912, ioSortFactor=10, memToMemMergeOutputsThreshold=10



Consider that the used memory is near memoryLimit and we shuffle another
map output to memory, the total memory used may exceed under this
configuration:
total_memory_used = memoryLimit + maxSingleShuffleLimit
                                       =   total_memory *
input_buffer_percent * (1 + memory_limit_percent)
                                       =   total_memory * 0.9 * 1.25 =
total_memory * 1.125

When I set mapreduce.reduce.shuffle.input.buffer.percent to 0.6 the job
runs well.


Here are my questions:
1. Are the default settings for shuffling suitable? Or do I miss something?
2. Though the job use less maps and reduces after we compress data with orc
format, but it runs slower than before. When I increase the reduce numbers
it use less time. I wonder maybe we can improve the algorithm of
estimateNumberOfReducers and take input data format into consideration?

Any help is appreciated.

Reply via email to