What version of Hadoop are you on? On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dnqu...@gmail.com> wrote:
> I thought I was running Hive with those changes merged in, but to make > sure, I built the latest trunk version. The behavior changed somewhat > (as in, it runs 2 stages instead of 1), but it still generates the > same number of files (# of files generated is equal to the number of > the original mappers, so I have no idea what the second stage is > actually doing). > > See below for query / explain query. Stage 1 runs always; Stage 3 > runs if hive.merge.mapfiles=true is set, but it still generates lots > of small files. > > The query is kind of large, but in essence it's simply > insert overwrite table foo partition(bar) select [columns] from > [table] tablesample(bucket 1 out of 10000 on rand()) where > [conditions]. > > > explain insert overwrite table hbase_prefilter3_us_sample partition > (ds) select > server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip, > 'COUNTRY_CODE', './GeoIP.dat'),'',ds from alogs_master > TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where > am_s.ds='2010-11-05' and am_s.request_url rlike > '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and > geoip_int(am_s.client_ip, 'COUNTRY_CODE', './GeoIP.dat')='US'; > OK > ABSTRACT SYNTAX TREE: > (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1 > 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION > (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds)))) > (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR > (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL > time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL > server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL > request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR > (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url > (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL > user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR > (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE' > './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds))) > (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05') > (rlike (. (TOK_TABLE_OR_COL am_s) request_url) > '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION > geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE' > './GeoIP.dat') 'US'))))) > > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 > Stage-4 > Stage-0 depends on stages: Stage-4, Stage-3 > Stage-2 depends on stages: Stage-0 > Stage-3 > > STAGE PLANS: > Stage: Stage-1 > Map Reduce > Alias -> Map Operator Tree: > am_s > TableScan > alias: am_s > Filter Operator > predicate: > expr: (((hash(rand()) & 2147483647) % 10000) = 0) > type: boolean > Filter Operator > predicate: > expr: ((request_url rlike > '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and > (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) > type: boolean > Filter Operator > predicate: > expr: (((ds = '2010-11-05') and (request_url > rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and > (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US')) > type: boolean > Select Operator > expressions: > expr: server_host > type: string > expr: client_ip > type: int > expr: time_stamp > type: int > expr: concat(server_host, ':', > regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1)) > type: string > expr: referrer > type: string > expr: parse_url(referrer, 'HOST') > type: string > expr: user_agent > type: string > expr: cookie > type: string > expr: GenericUDFGeoIP ( client_ip, > 'COUNTRY_CODE', './GeoIP.dat' ) > type: string > expr: '' > type: string > expr: ds > type: string > outputColumnNames: _col0, _col1, _col2, _col3, > _col4, _col5, _col6, _col7, _col8, _col9, _col10 > File Output Operator > compressed: true > GlobalTableId: 1 > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: hbase_prefilter3_us_sample > > Stage: Stage-5 > Conditional Operator > > Stage: Stage-4 > Move Operator > files: > hdfs directory: true > destination: > hdfs:// > namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000 > > Stage: Stage-0 > Move Operator > tables: > partition: > ds > replace: true > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: hbase_prefilter3_us_sample > > Stage: Stage-2 > Stats-Aggr Operator > > Stage: Stage-3 > Map Reduce > Alias -> Map Operator Tree: > hdfs:// > namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002 > File Output Operator > compressed: true > GlobalTableId: 0 > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: hbase_prefilter3_us_sample > > > > > On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nzh...@fb.com> wrote: > > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need > to be there for merging to take place. HIVE-1307 was committed to trunk on > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update > your Hive trunk and rerun the query. If it still doesn't work maybe you can > post your query and the result of 'explain <query>' and we can take a look. > > > > Ning > > > > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote: > > > >> Hi Ning, > >> For the dataset I'm experimenting with, the total size of the output > >> is 2mb, and the files are at most a few kb in size. My > >> hive.input.format was set to default HiveInputFormat; however, when I > >> set it to CombineHiveInputFormat, it only made the first stage of the > >> job use fewer mappers. The merge job was *still* filtered out at > >> runtime. I also tried set hive.mergejob.maponly=false; that didn't > >> have any effect. > >> > >> I am a bit at a loss what to do here. Is there a way to see what's > >> going on exactly using e.g. debug log levels?.. Btw, I'm also using > >> dynamic partitions; could that somehow be interfering with the merge > >> job?.. > >> > >> I'm running a relatively fresh Hive from trunk (built maybe a month > ago). > >> > >> --Leo > >> > >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nzh...@fb.com> wrote: > >>> The settings looks good. The parameter > hive.merge.size.smallfiles.avgsize is used to determine at run time if a > merge should be triggered: if the average size of the files in the partition > is SMALLER than the parameter and there are more than 1 file, the merge > should be scheduled. Can you try to see if you have any big files as well in > your resulting partition? If it is because of a very large file, you can set > the parameter large enough. > >>> > >>> Another possibility is that your Hadoop installation does not support > CombineHiveInputFormat, which is used for the new merge job. Someone > reported previously merge was not successful because of this. If that's the > case, you can turn off CombineHiveInputFormat and use the old > HiveInputFormat (though slower) by setting hive.mergejob.maponly=false. > >>> > >>> Ning > >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote: > >>> > >>>> I have jobs that sample (or generate) a small amount of data from a > >>>> large table. At the end, I get e.g. about 3000 or more files of 1kb > >>>> or so. This becomes a nuisance. How can I make Hive do another pass > >>>> to merge the output? I have the following settings: > >>>> > >>>> hive.merge.mapfiles=true > >>>> hive.merge.mapredfiles=true > >>>> hive.merge.size.per.task=256000000 > >>>> hive.merge.size.smallfiles.avgsize=16000000 > >>>> > >>>> After setting hive.merge* to true, Hive started indicating "Total > >>>> MapReduce jobs = 2". However, after generating the > >>>> lots-of-small-files table, Hive says: > >>>> Ended Job = job_201011021934_1344 > >>>> Ended Job = 781771542, job is filtered out (removed at runtime). > >>>> > >>>> Is there a way to force the merge, or am I missing something? > >>>> --Leo > >>> > >>> > > > > > -- Dave Brondsema Software Engineer Geeknet www.geek.net