Recently we are changing some data warehouse tables from textfile to orc format. Some of our hive SQL which read these orc tables failed at reduce stage. Reducers failed while copying Map outputs with following exception:
Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) > at > org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) > at > org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) > at > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) > at > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165) The query strings of affected SQL are in the below form: set hive.exec.dynamic.partition.mode=nonstrict; > set hive.exec.dynamic.partition=true; > set hive.exec.max.dynamic.partitions=10000; > insert overwrite table xxx partition(`day`) > select > count(distinct col1) c1, > count(distinct col2) c2, > ... > count(distinct col11) col11 > from t > group by col12, col13 Here we consider one specific job with 24G totalInputFileSize (orc compressed), it launches 97 maps (mapred.max.split.size is 256M) and 30 reduces(hive.exec.reducers.bytes.per.reducer = 1G). Since there are so many distinct, the total reduce shuffle bytes increase to 59G (lzo compressed, around 550G after decompressed). The average map output bytes each reducer fetch will be 550 * 1024 / 97 / 30 = 193M. Here I notice two default params which control the memory usage of shuffling process: mapreduce.reduce.shuffle.input.buffer.percent = 0.9 > mapreduce.reduce.shuffle.memory.limit.percent = 0.25 the memoryLimit and maxSingleShuffleLimit is as below: > memoryLimit = total_memory * $mapreduce.reduce.shuffle.input.buffer.percent > maxSingleShuffleLimit = memoryLimit * > $mapreduce.reduce.shuffle.memory.limit.percent Here maxSingleShuffleLimit is the threshold for shuffling map output to memory. >From the log we can find all the runtime params: 2014-05-04 16:39:27,129 INFO [main] > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl: MergerManager: > memoryLimit=1336252800, maxSingleShuffleLimit=334063200, > mergeThreshold=881926912, ioSortFactor=10, memToMemMergeOutputsThreshold=10 Consider that the used memory is near memoryLimit and we shuffle another map output to memory, the total memory used may exceed under this configuration: total_memory_used = memoryLimit + maxSingleShuffleLimit = total_memory * input_buffer_percent * (1 + memory_limit_percent) = total_memory * 0.9 * 1.25 = total_memory * 1.125 When I set mapreduce.reduce.shuffle.input.buffer.percent to 0.6 the job runs well. Here are my questions: 1. Are the default settings for shuffling suitable? Or do I miss something? 2. Though the job use less maps and reduces after we compress data with orc format, but it runs slower than before. When I increase the reduce numbers it use less time. I wonder maybe we can improve the algorithm of estimateNumberOfReducers and take input data format into consideration? Any help is appreciated.