[ 
https://issues.apache.org/jira/browse/HIVE-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13863047#comment-13863047
 ] 

Yin Huai commented on HIVE-5945:
--------------------------------

Thanks Navis for the change. date_dim is a native table. Actually, I think the 
problem is 
org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.getParticipants. 
It uses ctx.getAliasToTask(); to get all aliases. However, these aliases do not 
include aliases appearing in the MapLocalWork (those small tables.). So for a 
query like 
{code}
set hive.auto.convert.join.noconditionaltask=false;
select
   i_item_id
FROM store_sales
JOIN item on (store_sales.ss_item_sk = item.i_item_sk)
limit 10;
{code}

The plan is 
{code}
STAGE DEPENDENCIES:
  Stage-5 is a root stage , consists of Stage-6, Stage-1
  Stage-6 has a backup stage: Stage-1
  Stage-3 depends on stages: Stage-6
  Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-5
    Conditional Operator

  Stage: Stage-6
    Map Reduce Local Work
      Alias -> Map Local Tables:
        item 
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        item 
          TableScan
            alias: item
            HashTable Sink Operator
              condition expressions:
                0 
                1 {i_item_id}
              handleSkewJoin: false
              keys:
                0 [Column[ss_item_sk]]
                1 [Column[i_item_sk]]
              Position of Big Table: 0

  Stage: Stage-3
    Map Reduce
      Alias -> Map Operator Tree:
        store_sales 
          TableScan
            alias: store_sales
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              condition expressions:
                0 
                1 {i_item_id}
              handleSkewJoin: false
              keys:
                0 [Column[ss_item_sk]]
                1 [Column[i_item_sk]]
              outputColumnNames: _col26
              Position of Big Table: 0
              Select Operator
                expressions:
                      expr: _col26
                      type: string
                outputColumnNames: _col0
                Limit
                  File Output Operator
                    compressed: false
                    GlobalTableId: 0
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        item 
          TableScan
            alias: item
            Reduce Output Operator
              key expressions:
                    expr: i_item_sk
                    type: int
              sort order: +
              Map-reduce partition columns:
                    expr: i_item_sk
                    type: int
              tag: 1
              value expressions:
                    expr: i_item_id
                    type: string
        store_sales 
          TableScan
            alias: store_sales
            Reduce Output Operator
              key expressions:
                    expr: ss_item_sk
                    type: int
              sort order: +
              Map-reduce partition columns:
                    expr: ss_item_sk
                    type: int
              tag: 0
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 
            1 {VALUE._col1}
          handleSkewJoin: false
          outputColumnNames: _col26
          Select Operator
            expressions:
                  expr: _col26
                  type: string
            outputColumnNames: _col0
            Limit
              File Output Operator
                compressed: false
                GlobalTableId: 0
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 10
{code}
The alias of "item" will not be in the set returned by getParticipants. Thus, 
the input of sumOfExcept will be 
{code}
aliasToSize: {store_sales=388445409, item=5051899}
aliases: [store_sales]
except: store_sales
{code}
and then we get "0" for the size of small tables.

I think in getParticipants, we can check the type of a task and if it is a 
MapRedTask, we can use getWork().getMapWork().getMapLocalWork() to get the 
local task. Then, we can get aliases of those small tables through aliasToWork.

Another minor comment. Can you add a comment for the method of clone in the 
class of ConditionalResolverCommonJoinCtx to explain why we only want to copy 
aliasToKnownSize?

Thanks :)

> ql.plan.ConditionalResolverCommonJoin.resolveMapJoinTask also sums those 
> tables which are not used in the child of this conditional task.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-5945
>                 URL: https://issues.apache.org/jira/browse/HIVE-5945
>             Project: Hive
>          Issue Type: Bug
>          Components: Query Processor
>    Affects Versions: 0.8.0, 0.9.0, 0.10.0, 0.11.0, 0.12.0, 0.13.0
>            Reporter: Yin Huai
>            Assignee: Navis
>            Priority: Critical
>         Attachments: HIVE-5945.1.patch.txt, HIVE-5945.2.patch.txt, 
> HIVE-5945.3.patch.txt, HIVE-5945.4.patch.txt, HIVE-5945.5.patch.txt, 
> HIVE-5945.6.patch.txt
>
>
> Here is an example
> {code}
> select
>    i_item_id,
>    s_state,
>    avg(ss_quantity) agg1,
>    avg(ss_list_price) agg2,
>    avg(ss_coupon_amt) agg3,
>    avg(ss_sales_price) agg4
> FROM store_sales
> JOIN date_dim on (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
> JOIN item on (store_sales.ss_item_sk = item.i_item_sk)
> JOIN customer_demographics on (store_sales.ss_cdemo_sk = 
> customer_demographics.cd_demo_sk)
> JOIN store on (store_sales.ss_store_sk = store.s_store_sk)
> where
>    cd_gender = 'F' and
>    cd_marital_status = 'U' and
>    cd_education_status = 'Primary' and
>    d_year = 2002 and
>    s_state in ('GA','PA', 'LA', 'SC', 'MI', 'AL')
> group by
>    i_item_id,
>    s_state
> order by
>    i_item_id,
>    s_state
> limit 100;
> {\code}
> I turned off noconditionaltask. So, I expected that there will be 4 Map-only 
> jobs for this query. However, I got 1 Map-only job (joining strore_sales and 
> date_dim) and 3 MR job (for reduce joins.)
> So, I checked the conditional task determining the plan of the join involving 
> item. In ql.plan.ConditionalResolverCommonJoin.resolveMapJoinTask, 
> aliasToFileSizeMap contains all input tables used in this query and the 
> intermediate table generated by joining store_sales and date_dim. So, when we 
> sum the size of all small tables, the size of store_sales (which is around 
> 45GB in my test) will be also counted.  



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to