Thanks for following up Ted, I couldn't work out why the progress tracking was being forced on for Dynamic Partition inserts so thanks for your helpful explanation. I'll raise a JIRA issue regarding the problem. Do you have any idea for an alternate approach? I could have a go at implementing a fix but I'm not sure in what a better alternative might be.
In the mean time I've implemented a semantic hook that will remove the counters that are added to the operators. The source is below encase anyone else finds it useful. To use it: - Start hive with the jar loaded, "export HIVE_AUX_JARS_PATH=path/to/jar" - Add the hook, "set hive.semantic.analyzer.hook=com.atlassian.hive.RemoveCountersHook;" Thanks, Shaun /* * Remove counters from tasks (this drastically speeds up dynamic * partition inserts in Amazon EMR). The counters are enabled because * hive.task.progress is forced on in SemanticAnalyzer.java for * dynamic partition insert queries. Ted Xu explains that this is * simply so that the job can be killed if the maximum number of * dynamic partitions is exceeded in the following mailing list * message: * * http://mail-archives.apache.org/mod_mbox/hive-user/201306.mbox/%3CCAP9%2B16xbCfvCc%3DgiKW4a9GQ588sZhGYiiKH7DS5CH9nr07i-ug%40mail.gmail.com%3E * * If you are sure the limit will not be exceeded removing the * counters is extremely beneficial. * * Copyright © 2013 Atlassian Corporation Pty Ltd. Licensed * under the Apache License, Version 2.0 (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of * the License at http://www.apache.org/licenses/LICENSE-2.0. Unless * required by applicable law or agreed to in writing, software distributed * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES * OR CONDITIONS OF ANY KIND, either express or implied. See the License * for the specific language governing permissions and limitations under the License. * * There are three types of hooks all of which are executed in Driver.java * BEFORE the map reduce jobs are submitted (i.e they are processed in the * Hive driver/CLI): * * - Semantic Analyzer Hooks - Declared with comma separated class * names in hive.semantic.analyzer.hook. Called both before and * after semantic analysis is completed. Implements * AbstractSemanticAnalyzerHook * - Pre hooks - Declared with comma separated class names in * hive.exec.pre.hooks. Run after the job has completed, can * implement ExecuteWithHookContext or PostExecute * - Post hooks - Declared with comma separated class names in * hive.exec.post.hooks. Run after the job has completed, can * implement ExecuteWithHookContext or PostExecute */ package com.atlassian.hive; import java.lang.RuntimeException; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.plan.MapredWork; public class RemoveCountersHook extends AbstractSemanticAnalyzerHook { private static final Log LOG = LogFactory.getLog(RemoveCountersHook.class); @Override public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast) throws SemanticException { //LogHelper console = SessionState.getConsole(); //console.printInfo("RemoveCountersHook called for preAnalyze"); LOG.info("RemoveCountersHook called for preAnalyze"); return ast; } @Override public void postAnalyze(HiveSemanticAnalyzerHookContext context, List<Task<? extends Serializable>> rootTasks) throws SemanticException { LogHelper console = SessionState.getConsole(); LOG.info("RemoveCountersHook called for postAnalyze"); LOG.info("Context " + context); LOG.info("Root tasks " + rootTasks); for (Task<? extends Serializable> tsk : rootTasks) removeOperatorCountersFromTask(tsk); return; } public void removeOperatorCountersFromTask(Task<? extends Serializable> task) { Operator.resetLastEnumUsed(); if (task instanceof ExecDriver) { HashMap<String, Operator<? extends Serializable>> opMap = ((MapredWork) task .getWork()).getAliasToWork(); if (!opMap.isEmpty()) { for (Operator<? extends Serializable> op : opMap.values()) { removeOperatorCountersFromOp(task, op); } } Operator<? extends Serializable> reducer = ((MapredWork) task.getWork()) .getReducer(); if (reducer != null) { removeOperatorCountersFromOp(task, reducer); } } else if (task instanceof ConditionalTask) { List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task) .getListTasks(); for (Task<? extends Serializable> tsk : listTasks) { removeOperatorCountersFromTask(tsk); } } if (task.getChildTasks() == null) { return; } for (Task<? extends Serializable> childTask : task.getChildTasks()) { removeOperatorCountersFromTask(childTask); } } private void removeOperatorCountersFromOp(Task<? extends Serializable> task, Operator<? extends Serializable> op) { HashMap<String, ProgressCounter> counterNameToEnum = op.getCounterNameToEnum(); if (counterNameToEnum == null || counterNameToEnum.size() == 0) LOG.info("No counters to remove from operator " + op); else { LOG.info("Removing " + counterNameToEnum.size() + " counters from operator " + op + " in task " + task); // op.setCounterNames(new ArrayList<String>()); op.setCounterNameToEnum(null); } if (op.getChildOperators() == null) { return; } for (Operator<? extends Serializable> child : op.getChildOperators()) { removeOperatorCountersFromOp(task, child); } } } On 17 June 2013 19:48, Ted Xu <t...@gopivotal.com> wrote: > Hi Shaun, > > Your findings are valid. Hive uses Hadoop job counters to report fatal > error, so the client can kill the MapReduce job before it completes. > > With regard to your case, because Hive wants to kill the MapReduce job > when there is too many partitions using Dynamic Partitioning, counters > report is forced to enable. IMHO, fatal error report should not depend on > the "job progress" switch. You can file a JIRA ticket on this one. > > > On Fri, Jun 7, 2013 at 1:55 PM, Shaun Clowes <sclo...@atlassian.com>wrote: > >> Hi Ted, All, >> >> Unfortunately profiling turns out to be extremely slow, so it's not very >> fruitful for determining what's going on here. >> >> On the other hand I seem to have traced this problem down to the >> "hive.task.progress" configuration variable. When this is set to true (as >> it is automatically when a dynamic partition insert it used), the insert is >> drastically slower than it is otherwise. >> >> In SemanticAnalyzer.java it forces this task tracking on as follows: >> >> // turn on hive.task.progress to update # of partitions created >> to the JT >> HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS, >> true); >> >> Does anyone know why this must be turned on? What is the need for the >> number of partitions created to be reported? The end result is a lot more >> than just the number of partitions having their statistics reported. >> >> I'm not sure why the insert is so very slow when it's on, perhaps the >> retrieval of the current time in millis in Operator.java: >> >> 1076 /** >> 1077 * this is called after operator process to buffer some counters. >> 1078 */ >> 1079 private void postProcessCounter() { >> 1080 if (counterNameToEnum != null) { >> 1081 totalTime += (System.currentTimeMillis() - beginTime); >> 1082 } >> 1083 } >> >> Thanks, >> Shaun >> >> >> On 6 June 2013 19:00, Ted Xu <t...@gopivotal.com> wrote: >> >>> Hi Shaun, >>> >>> This is weird. I'm not sure if there is any other reasons (e.g., a very >>> complex UDF?) caused this issue, but it would be the best if you can do a >>> profiling<http://hadoop.apache.org/docs/stable/mapred_tutorial.html#Profiling>, >>> see if there is hot spot. >>> >>> >>> On Thu, Jun 6, 2013 at 4:38 PM, Shaun Clowes <sclo...@atlassian.com>wrote: >>> >>>> Hi Ted, >>>> >>>> It's actually just one partition being created which is what makes it >>>> so weird. >>>> >>>> Thanks, >>>> Shaun >>>> >>>> >>>> On 6 June 2013 18:36, Ted Xu <t...@gopivotal.com> wrote: >>>> >>>>> Hi Shaun, >>>>> >>>>> Too many partitions in dynamic partitioning may slow down the >>>>> mapreduce job. Can you estimate how many partitions will be generated >>>>> after >>>>> insert? >>>>> >>>>> >>>>> On Thu, Jun 6, 2013 at 4:24 PM, Shaun Clowes <sclo...@atlassian.com>wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> Does anyone know the performance impact the dynamic partitions should >>>>>> be expected to have? >>>>>> >>>>>> I have a table that is partitioned by a string in the form 'YYYY-MM'. >>>>>> When I insert in to this table (from an external table that is just an S3 >>>>>> bucket containing gzipped logs) using dynamic partitioning I get very >>>>>> slow >>>>>> performance with each node in the cluster unable to process more than 2MB >>>>>> per second. When I run the exact same query with static partition values >>>>>> I >>>>>> get more about 30-40MB/s on each node. >>>>>> >>>>>> I've never seen this type of problem with our internal cluster >>>>>> running Hive 0.7.1 (CDH3u4), but it happens every time in EMR. >>>>>> >>>>>> Thanks, >>>>>> Shaun >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> Ted Xu >>>>> >>>> >>>> >>> >>> >>> -- >>> Regards, >>> Ted Xu >>> >> >> > > > -- > Regards, > Ted Xu >