I thought I was running Hive with those changes merged in, but to make
sure, I built the latest trunk version.  The behavior changed somewhat
(as in, it runs 2 stages instead of 1), but it still generates the
same number of files (# of files generated is equal to the number of
the original mappers, so I have no idea what the second stage is
actually doing).

See below for query / explain query.  Stage 1 runs always; Stage 3
runs if hive.merge.mapfiles=true is set, but it still generates lots
of small files.

The query is kind of large, but in essence it's simply
insert overwrite table foo partition(bar) select [columns] from
[table] tablesample(bucket 1 out of 10000 on rand()) where
[conditions].


explain insert overwrite table hbase_prefilter3_us_sample partition
(ds) select 
server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
am_s.ds='2010-11-05' and am_s.request_url rlike
'^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
(TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
(TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
(TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
(TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
(TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
(TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
'./GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
(TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
(rlike (. (TOK_TABLE_OR_COL am_s) request_url)
'^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
'./GeoIP.dat') 'US')))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
  Stage-4
  Stage-0 depends on stages: Stage-4, Stage-3
  Stage-2 depends on stages: Stage-0
  Stage-3

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        am_s
          TableScan
            alias: am_s
            Filter Operator
              predicate:
                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
                  type: boolean
              Filter Operator
                predicate:
                    expr: ((request_url rlike
'^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
(GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
                    type: boolean
                Filter Operator
                  predicate:
                      expr: (((ds = '2010-11-05') and (request_url
rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
(GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
                      type: boolean
                  Select Operator
                    expressions:
                          expr: server_host
                          type: string
                          expr: client_ip
                          type: int
                          expr: time_stamp
                          type: int
                          expr: concat(server_host, ':',
regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
                          type: string
                          expr: referrer
                          type: string
                          expr: parse_url(referrer, 'HOST')
                          type: string
                          expr: user_agent
                          type: string
                          expr: cookie
                          type: string
                          expr: GenericUDFGeoIP ( client_ip,
'COUNTRY_CODE', './GeoIP.dat' )
                          type: string
                          expr: ''
                          type: string
                          expr: ds
                          type: string
                    outputColumnNames: _col0, _col1, _col2, _col3,
_col4, _col5, _col6, _col7, _col8, _col9, _col10
                    File Output Operator
                      compressed: true
                      GlobalTableId: 1
                      table:
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          name: hbase_prefilter3_us_sample

  Stage: Stage-5
    Conditional Operator

  Stage: Stage-4
    Move Operator
      files:
          hdfs directory: true
          destination:
hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000

  Stage: Stage-0
    Move Operator
      tables:
          partition:
            ds
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: hbase_prefilter3_us_sample

  Stage: Stage-2
    Stats-Aggr Operator

  Stage: Stage-3
    Map Reduce
      Alias -> Map Operator Tree:
        
hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
            File Output Operator
              compressed: true
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                  name: hbase_prefilter3_us_sample




On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote:
> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need to 
> be there for merging to take place. HIVE-1307 was committed to trunk on 08/25 
> and HIVE-1622 was committed on 09/13. The simplest way is to update your Hive 
> trunk and rerun the query. If it still doesn't work maybe you can post your 
> query and the result of 'explain <query>' and we can take a look.
>
> Ning
>
> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>
>> Hi Ning,
>> For the dataset I'm experimenting with, the total size of the output
>> is 2mb, and the files are at most a few kb in size.  My
>> hive.input.format was set to default HiveInputFormat; however, when I
>> set it to CombineHiveInputFormat, it only made the first stage of the
>> job use fewer mappers.  The merge job was *still* filtered out at
>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>> have any effect.
>>
>> I am a bit at a loss what to do here.  Is there a way to see what's
>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>> dynamic partitions; could that somehow be interfering with the merge
>> job?..
>>
>> I'm running a relatively fresh Hive from trunk (built maybe a month ago).
>>
>> --Leo
>>
>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote:
>>> The settings looks good. The parameter hive.merge.size.smallfiles.avgsize 
>>> is used to determine at run time if a merge should be triggered: if the 
>>> average size of the files in the partition is SMALLER than the parameter 
>>> and there are more than 1 file, the merge should be scheduled. Can you try 
>>> to see if you have any big files as well in your resulting partition? If it 
>>> is because of a very large file, you can set the parameter large enough.
>>>
>>> Another possibility is that your Hadoop installation does not support 
>>> CombineHiveInputFormat, which is used for the new merge job. Someone 
>>> reported previously merge was not successful because of this. If that's the 
>>> case, you can turn off CombineHiveInputFormat and use the old 
>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>
>>> Ning
>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>
>>>> I have jobs that sample (or generate) a small amount of data from a
>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>> to merge the output?  I have the following settings:
>>>>
>>>> hive.merge.mapfiles=true
>>>> hive.merge.mapredfiles=true
>>>> hive.merge.size.per.task=256000000
>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>
>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>> MapReduce jobs = 2".  However, after generating the
>>>> lots-of-small-files table, Hive says:
>>>> Ended Job = job_201011021934_1344
>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>
>>>> Is there a way to force the merge, or am I missing something?
>>>> --Leo
>>>
>>>
>
>

Reply via email to