Thanks for following up Ted, I couldn't work out why the progress tracking
was being forced on for Dynamic Partition inserts so thanks for your
helpful explanation. I'll raise a JIRA issue regarding the problem. Do you
have any idea for an alternate approach? I could have a go at implementing
a fix but I'm not sure in what a better alternative might be.

In the mean time I've implemented a semantic hook that will remove the
counters that are added to the operators. The source is below encase anyone
else finds it useful. To use it:

- Start hive with the jar loaded, "export HIVE_AUX_JARS_PATH=path/to/jar"
- Add the hook, "set
hive.semantic.analyzer.hook=com.atlassian.hive.RemoveCountersHook;"

Thanks,
Shaun

/*
 * Remove counters from tasks (this drastically speeds up dynamic
 * partition inserts in Amazon EMR). The counters are enabled because
 * hive.task.progress is forced on in SemanticAnalyzer.java for
 * dynamic partition insert queries. Ted Xu explains that this is
 * simply so that the job can be killed if the maximum number of
 * dynamic partitions is exceeded in the following mailing list
 * message:
 *
 *
http://mail-archives.apache.org/mod_mbox/hive-user/201306.mbox/%3CCAP9%2B16xbCfvCc%3DgiKW4a9GQ588sZhGYiiKH7DS5CH9nr07i-ug%40mail.gmail.com%3E

 *
 * If you are sure the limit will not be exceeded removing the
 * counters is extremely beneficial.
 *
 * Copyright © 2013 Atlassian Corporation Pty Ltd. Licensed
 * under the Apache License, Version 2.0 (the "License"); you may not use
 * this file except in compliance with the License. You may obtain a copy
of
 * the License at http://www.apache.org/licenses/LICENSE-2.0. Unless
 * required by applicable law or agreed to in writing, software distributed
 * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
 * OR CONDITIONS OF ANY KIND, either express or implied. See the License
 * for the specific language governing permissions and limitations under
the License.
 *
 * There are three types of hooks all of which are executed in Driver.java
 * BEFORE the map reduce jobs are submitted (i.e they are processed in the
 * Hive driver/CLI):
 *
 *   - Semantic Analyzer Hooks - Declared with comma separated class
 *     names in hive.semantic.analyzer.hook. Called both before and
 *     after semantic analysis is completed. Implements
 *     AbstractSemanticAnalyzerHook
 *   - Pre hooks - Declared with comma separated class names in
 *     hive.exec.pre.hooks. Run after the job has completed, can
 *     implement ExecuteWithHookContext or PostExecute
 *   - Post hooks - Declared with comma separated class names in
 *     hive.exec.post.hooks. Run after the job has completed, can
 *     implement ExecuteWithHookContext or PostExecute
 */
package com.atlassian.hive;

import java.lang.RuntimeException;
import java.io.IOException;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;

import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.plan.MapredWork;

public class RemoveCountersHook extends AbstractSemanticAnalyzerHook {

   private static final Log LOG =
LogFactory.getLog(RemoveCountersHook.class);

   @Override
   public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
ASTNode ast)
      throws SemanticException {
      //LogHelper console = SessionState.getConsole();
      //console.printInfo("RemoveCountersHook called for preAnalyze");
      LOG.info("RemoveCountersHook called for preAnalyze");
      return ast;
   }


   @Override
   public void postAnalyze(HiveSemanticAnalyzerHookContext context,
      List<Task<? extends Serializable>> rootTasks) throws
SemanticException {
      LogHelper console = SessionState.getConsole();
      LOG.info("RemoveCountersHook called for postAnalyze");
      LOG.info("Context " + context);
      LOG.info("Root tasks " + rootTasks);

      for (Task<? extends Serializable> tsk : rootTasks)
         removeOperatorCountersFromTask(tsk);

      return;
   }

   public void removeOperatorCountersFromTask(Task<? extends Serializable>
task)
   {
    Operator.resetLastEnumUsed();

    if (task instanceof ExecDriver) {
      HashMap<String, Operator<? extends Serializable>> opMap =
((MapredWork) task
          .getWork()).getAliasToWork();
      if (!opMap.isEmpty()) {
        for (Operator<? extends Serializable> op : opMap.values()) {
          removeOperatorCountersFromOp(task, op);
        }
      }

      Operator<? extends Serializable> reducer = ((MapredWork)
task.getWork())
          .getReducer();
      if (reducer != null) {
        removeOperatorCountersFromOp(task, reducer);
      }
    } else if (task instanceof ConditionalTask) {
      List<Task<? extends Serializable>> listTasks = ((ConditionalTask)
task)
          .getListTasks();
      for (Task<? extends Serializable> tsk : listTasks) {
        removeOperatorCountersFromTask(tsk);
      }
    }

    if (task.getChildTasks() == null) {
      return;
    }

    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
      removeOperatorCountersFromTask(childTask);
    }
   }

  private void removeOperatorCountersFromOp(Task<? extends Serializable>
task, Operator<? extends Serializable> op) {
    HashMap<String, ProgressCounter> counterNameToEnum =
op.getCounterNameToEnum();
    if (counterNameToEnum == null || counterNameToEnum.size() == 0)
      LOG.info("No counters to remove from operator " + op);
    else
    {
      LOG.info("Removing " + counterNameToEnum.size() + " counters from
operator " + op + " in task " + task);
      // op.setCounterNames(new ArrayList<String>());
      op.setCounterNameToEnum(null);
    }

    if (op.getChildOperators() == null) {
      return;
    }

    for (Operator<? extends Serializable> child : op.getChildOperators()) {
      removeOperatorCountersFromOp(task, child);
    }
  }
}




On 17 June 2013 19:48, Ted Xu <t...@gopivotal.com> wrote:

> Hi Shaun,
>
> Your findings are valid. Hive uses Hadoop job counters to report fatal
> error, so the client can kill the MapReduce job before it completes.
>
> With regard to your case, because Hive wants to kill the MapReduce job
> when there is too many partitions using Dynamic Partitioning, counters
> report is forced to enable. IMHO, fatal error report should not depend on
> the "job progress" switch. You can file a JIRA ticket on this one.
>
>
> On Fri, Jun 7, 2013 at 1:55 PM, Shaun Clowes <sclo...@atlassian.com>wrote:
>
>> Hi Ted, All,
>>
>> Unfortunately profiling turns out to be extremely slow, so it's not very
>> fruitful for determining what's going on here.
>>
>> On the other hand I seem to have traced this problem down to the
>> "hive.task.progress" configuration variable. When this is set to true (as
>> it is automatically when a dynamic partition insert it used), the insert is
>> drastically slower than it is otherwise.
>>
>> In SemanticAnalyzer.java it forces this task tracking on as follows:
>>
>>           // turn on hive.task.progress to update # of partitions created
>> to the JT
>>           HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS,
>> true);
>>
>> Does anyone know why this must be turned on? What is the need for the
>> number of partitions created to be reported? The end result is a lot more
>> than just the number of partitions having their statistics reported.
>>
>> I'm not sure why the insert is so very slow when it's on, perhaps the
>> retrieval of the current time in millis in Operator.java:
>>
>> 1076   /**
>> 1077    * this is called after operator process to buffer some counters.
>> 1078    */
>> 1079   private void postProcessCounter() {
>> 1080     if (counterNameToEnum != null) {
>> 1081       totalTime += (System.currentTimeMillis() - beginTime);
>> 1082     }
>> 1083   }
>>
>> Thanks,
>> Shaun
>>
>>
>> On 6 June 2013 19:00, Ted Xu <t...@gopivotal.com> wrote:
>>
>>> Hi Shaun,
>>>
>>> This is weird. I'm not sure if there is any other reasons (e.g., a very
>>> complex UDF?) caused this issue, but it would be the best if you can do a
>>> profiling<http://hadoop.apache.org/docs/stable/mapred_tutorial.html#Profiling>,
>>> see if there is hot spot.
>>>
>>>
>>> On Thu, Jun 6, 2013 at 4:38 PM, Shaun Clowes <sclo...@atlassian.com>wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> It's actually just one partition being created which is what makes it
>>>> so weird.
>>>>
>>>> Thanks,
>>>> Shaun
>>>>
>>>>
>>>> On 6 June 2013 18:36, Ted Xu <t...@gopivotal.com> wrote:
>>>>
>>>>> Hi Shaun,
>>>>>
>>>>> Too many partitions in dynamic partitioning may slow down the
>>>>> mapreduce job. Can you estimate how many partitions will be generated 
>>>>> after
>>>>> insert?
>>>>>
>>>>>
>>>>> On Thu, Jun 6, 2013 at 4:24 PM, Shaun Clowes <sclo...@atlassian.com>wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> Does anyone know the performance impact the dynamic partitions should
>>>>>> be expected to have?
>>>>>>
>>>>>> I have a table that is partitioned by a string in the form 'YYYY-MM'.
>>>>>> When I insert in to this table (from an external table that is just an S3
>>>>>> bucket containing gzipped logs) using dynamic partitioning I get very 
>>>>>> slow
>>>>>> performance with each node in the cluster unable to process more than 2MB
>>>>>> per second. When I run the exact same query with static partition values 
>>>>>> I
>>>>>> get more about 30-40MB/s on each node.
>>>>>>
>>>>>> I've never seen this type of problem with our internal cluster
>>>>>> running Hive 0.7.1 (CDH3u4), but it happens every time in EMR.
>>>>>>
>>>>>> Thanks,
>>>>>> Shaun
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Ted Xu
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Ted Xu
>>>
>>
>>
>
>
> --
> Regards,
> Ted Xu
>

Reply via email to