I found another criterion that determines whether or not the merge job
runs with compression turned on.  It seems that if the target table is
stored as an rcfile, merges work, but if a text file, merges will
fail.  For instance:

-- merge will work here:
create table  alogs_dbg_sample3 (server_host STRING, client_ip INT,
time_stamp INT) row format serde
'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as
rcfile;
insert overwrite table alogs_dbg_sample3 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

-- merge will fail here:
create table alogs_dbg_sample2 (server_host STRING, client_ip INT,
time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;
insert overwrite table alogs_dbg_sample2 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

Is this a bug?..  I don't see why merge job running should be
sensitive to the output table format.
P.S:  I have hive.merge.maponly=true and am using LZO compression

On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nzh...@fb.com> wrote:
> It makes sense. CombineHiveInputFormat does not work with compressed text 
> files (suffix *.gz) since it is not splittable. I think your default 
> hive.file.format=CombineHiveInputFormat. But I think by setting 
> hive.merge.maponly it should work (meaning merge should be succeeded). By 
> setting hive.merge.maponly, you'll have multiple mappers (the same # of small 
> files) and 1 reducer. The reducer's output should be the merged result.
>
> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:
>
>> Folks, thanks for your help.  I've narrowed the problem down to
>> compression.  When I set hive.exec.compress.output=false, merges
>> proceed as expected.  When compression is on, the merge job doesn't
>> seem to actually merge, it just spits out the input.
>>
>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <heyongqiang...@gmail.com> 
>> wrote:
>>> These are the parameters that control the behavior. (Try to set them
>>> to different values if it does not work in your environment.)
>>>
>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>>> set mapred.min.split.size.per.node=1000000000;
>>> set mapred.min.split.size.per.rack=1000000000;
>>> set mapred.max.split.size=1000000000;
>>>
>>> set hive.merge.size.per.task=1000000000;
>>> set hive.merge.smallfiles.avgsize=1000000000;
>>> set hive.merge.size.smallfiles.avgsize=1000000000;
>>> set hive.exec.dynamic.partition.mode=nonstrict;
>>>
>>>
>>> The output size of the second job is also controlled by the split
>>> size, as shown in the first 4 lines.
>>>
>>>
>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dnqu...@gmail.com> wrote:
>>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>>> worked for me in the past.  Again, what's strange here is with the
>>>> latest Hive build the merge stage appears to run, but it doesn't
>>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>>> doesn't do anything.
>>>>
>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <dbronds...@geek.net> 
>>>> wrote:
>>>>> What version of Hadoop are you on?
>>>>>
>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> wrote:
>>>>>>
>>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>>> same number of files (# of files generated is equal to the number of
>>>>>> the original mappers, so I have no idea what the second stage is
>>>>>> actually doing).
>>>>>>
>>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>>> of small files.
>>>>>>
>>>>>> The query is kind of large, but in essence it's simply
>>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>>> [conditions].
>>>>>>
>>>>>>
>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>>> (ds) select
>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>>> OK
>>>>>> ABSTRACT SYNTAX TREE:
>>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat') 'US')))))
>>>>>>
>>>>>> STAGE DEPENDENCIES:
>>>>>>  Stage-1 is a root stage
>>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>>  Stage-4
>>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>>  Stage-2 depends on stages: Stage-0
>>>>>>  Stage-3
>>>>>>
>>>>>> STAGE PLANS:
>>>>>>  Stage: Stage-1
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>        am_s
>>>>>>          TableScan
>>>>>>            alias: am_s
>>>>>>            Filter Operator
>>>>>>              predicate:
>>>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>>>                  type: boolean
>>>>>>              Filter Operator
>>>>>>                predicate:
>>>>>>                    expr: ((request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                    type: boolean
>>>>>>                Filter Operator
>>>>>>                  predicate:
>>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                      type: boolean
>>>>>>                  Select Operator
>>>>>>                    expressions:
>>>>>>                          expr: server_host
>>>>>>                          type: string
>>>>>>                          expr: client_ip
>>>>>>                          type: int
>>>>>>                          expr: time_stamp
>>>>>>                          type: int
>>>>>>                          expr: concat(server_host, ':',
>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>>                          type: string
>>>>>>                          expr: referrer
>>>>>>                          type: string
>>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>>                          type: string
>>>>>>                          expr: user_agent
>>>>>>                          type: string
>>>>>>                          expr: cookie
>>>>>>                          type: string
>>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>>                          type: string
>>>>>>                          expr: ''
>>>>>>                          type: string
>>>>>>                          expr: ds
>>>>>>                          type: string
>>>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>>                    File Output Operator
>>>>>>                      compressed: true
>>>>>>                      GlobalTableId: 1
>>>>>>                      table:
>>>>>>                          input format:
>>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>>                          output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                          serde:
>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                          name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-5
>>>>>>    Conditional Operator
>>>>>>
>>>>>>  Stage: Stage-4
>>>>>>    Move Operator
>>>>>>      files:
>>>>>>          hdfs directory: true
>>>>>>          destination:
>>>>>>
>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>>>
>>>>>>  Stage: Stage-0
>>>>>>    Move Operator
>>>>>>      tables:
>>>>>>          partition:
>>>>>>            ds
>>>>>>          replace: true
>>>>>>          table:
>>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>              output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>              name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-2
>>>>>>    Stats-Aggr Operator
>>>>>>
>>>>>>  Stage: Stage-3
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>
>>>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>>            File Output Operator
>>>>>>              compressed: true
>>>>>>              GlobalTableId: 0
>>>>>>              table:
>>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>                  output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                  serde: 
>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                  name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>>>>> to be there for merging to take place. HIVE-1307 was committed to trunk 
>>>>>>> on
>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to 
>>>>>>> update
>>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe you 
>>>>>>> can
>>>>>>> post your query and the result of 'explain <query>' and we can take a 
>>>>>>> look.
>>>>>>>
>>>>>>> Ning
>>>>>>>
>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>>>
>>>>>>>> Hi Ning,
>>>>>>>> For the dataset I'm experimenting with, the total size of the output
>>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I
>>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the
>>>>>>>> job use fewer mappers.  The merge job was *still* filtered out at
>>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>>>>>> have any effect.
>>>>>>>>
>>>>>>>> I am a bit at a loss what to do here.  Is there a way to see what's
>>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>>>>>> dynamic partitions; could that somehow be interfering with the merge
>>>>>>>> job?..
>>>>>>>>
>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>>>>>> ago).
>>>>>>>>
>>>>>>>> --Leo
>>>>>>>>
>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote:
>>>>>>>>> The settings looks good. The parameter
>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time 
>>>>>>>>> if a
>>>>>>>>> merge should be triggered: if the average size of the files in the 
>>>>>>>>> partition
>>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the 
>>>>>>>>> merge
>>>>>>>>> should be scheduled. Can you try to see if you have any big files as 
>>>>>>>>> well in
>>>>>>>>> your resulting partition? If it is because of a very large file, you 
>>>>>>>>> can set
>>>>>>>>> the parameter large enough.
>>>>>>>>>
>>>>>>>>> Another possibility is that your Hadoop installation does not support
>>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>>>>>>> reported previously merge was not successful because of this. If 
>>>>>>>>> that's the
>>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old
>>>>>>>>> HiveInputFormat (though slower) by setting 
>>>>>>>>> hive.mergejob.maponly=false.
>>>>>>>>>
>>>>>>>>> Ning
>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>>>
>>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a
>>>>>>>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>>>>>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>>>
>>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>>>
>>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>>>>>>>> MapReduce jobs = 2".  However, after generating the
>>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>>>>>>>
>>>>>>>>>> Is there a way to force the merge, or am I missing something?
>>>>>>>>>> --Leo
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Dave Brondsema
>>>>> Software Engineer
>>>>> Geeknet
>>>>>
>>>>> www.geek.net
>>>>>
>>>>
>>>
>
>

Reply via email to