To compute moving averages, you should implement a custom reducer instead of doing a big join. That will work *much* faster.
Also, Hive already has date_add(). Why did you have to implement your own? https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions igor decide.com On Mon, Jan 23, 2012 at 6:48 PM, Benjamin Poserow <bpose...@gmail.com>wrote: > I wrote, separately, a Hadoop job to calculate running averages of about > 2000 stock tickers over a 180 day period as well as a Hive script which > performs equivalent functionality. I have been using Amazon Elastic > MapReduce as my platform for running these jobs. I have been trying for a > while to get my Hive script to perform well when spread over many nodes, > but cannot seem to get the Hive script to perform nearly as well as the > Hadoop job. (The Hadoop job takes about an hour to run through all of my > tickers, whereas the Hive job takes over an hour just to run about 1/8 of > them and I cannot even seem to get it to finish when I run it for a larger > number of tickers.) I also have not seen large gains when running my Hive > job using a larger number of hosts. I've been trying to tinker with > settings, examine the query plans of my queries, attempt many modifications > of my queries, but have not seen great gains in performance. > > Here is my code. Can you help me identify potential problem points and > ways I can improve these queries, especially so they distribute well when > run on multiple hosts. I tried to add comments where appropriate to make > it clear what I was doing in each step. Please note there are about 2000 * > 180 = 360,000 rows in the raw symbol table. > > Please help, I am quite stuck on this! Feel free to ask any questions for > which you would like clarification. > > Here is my script: > > ADD JAR ${INPUT}/market-data.jar ; > ADD JAR ${INPUT}/HiveJars/derby.jar; > > set hive.stats.autogather=false; > > set hive.exec.dynamic.partition.mode=nonstrict; > set hive.exec.dynamic.partition=true; > set hive.exec.reducers.bytes.per.reducer=1000000000; > set hive.exec.max.dynamic.partitions.pernode=200000; > set hive.exec.max.dynamic.partitions=200000; > set hive.exec.max.created.files=1000000; > > -- Note ${INPUT} is the S3 URL to where my scripts and input files are > stored. ${INPUT}/hiveinput/output contains separate folders labeled > symbol=[ticker symbol] so that > -- they can be imported into a partitioned table. The files in these > folders contain the ticker prices of each of the stocks over a 180 day > period obtained from Yahoo Finance > CREATE EXTERNAL TABLE raw_symbols > (dt STRING, open STRING, high STRING, low STRING, close STRING, > volume STRING, adj_close STRING) > PARTITIONED BY (symbol STRING) > ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' > STORED AS TEXTFILE > LOCATION '${INPUT}/hiveinput/output' ; > > -- Elastic MapReduce requires you to execute this command to create all of > the dynamic partitions corresponding to the stock tickers > ALTER TABLE raw_symbols RECOVER PARTITIONS; > > -- This is simply loading a table with the sequence 1 through 90. I > actually couldn't find anything in Hive to create a simple integer > sequence. So this table is loaded with > -- this sequence > CREATE EXTERNAL TABLE day_seq_orig > (day INT) > ROW FORMAT DELIMITED LINES TERMINATED BY '\n' > STORED AS TEXTFILE > LOCATION '${SEQFILE}'; > > -- A temporary table to contain the distinct list of dates for which we > have stock prices, should be 180 dates for the 180 days for which we are > getting info > CREATE TABLE distinct_dt > (dt STRING) > STORED AS SEQUENCEFILE; > > -- ${SAMPLE_SYMBOL} is just one of my symbols. Since the same sequence of > dates applies to all tickers, this gives me an easy and quick way to get > the range of dates > INSERT OVERWRITE TABLE distinct_dt > SELECT distinct dt > FROM raw_symbols > WHERE raw_symbols.symbol = '${SAMPLE_SYMBOL}'; > > CREATE TABLE day_seq > (current_date STRING, original_date STRING) > STORED AS SEQUENCEFILE; > > -- We are calculating a 90 day rolling average for each stock ticker > price, so I want to get those dates within 90 days of each of the dates for > which I am getting stock ticker prices > INSERT OVERWRITE TABLE day_seq. date_add is a custom Hive function I > implemented; it does just what it implies: it adds an integral number to a > date. > SELECT date_add(dt, day), dt > FROM distinct_dt > JOIN day_seq_orig ON (1=1); > > CREATE TABLE converted_symbols > ( > symbol STRING, > original_date STRING, > close FLOAT > ) > STORED AS SEQUENCEFILE; > > CREATE TABLE converted_symbols_cf > ( > original_date STRING, > close FLOAT > ) > PARTITIONED BY (symbol STRING) > STORED AS RCFILE; > > CREATE TABLE converted_symbols_cf_abbr > ( > original_date STRING, > close FLOAT > ) > PARTITIONED BY (symbol STRING) > STORED AS RCFILE; > > CREATE TABLE dt_partitioned_symbols > ( > symbol STRING, > close FLOAT > ) > PARTITIONED BY (original_date STRING) > STORED AS SEQUENCEFILE; > > > CREATE TABLE joined_symbols > ( > original_date STRING, > current_date STRING, close FLOAT > ) > PARTITIONED BY (symbol STRING) > STORED AS SEQUENCEFILE; > > -- Take the raw symbol stock prices, which did no data conversions, and > convert the price to a float and put into a temp table > INSERT OVERWRITE TABLE converted_symbols_cf > PARTITION (symbol) > SELECT > dt, cast(close AS FLOAT), symbol > FROM raw_symbols > DISTRIBUTE BY symbol; > > -- Remove some erroneous header rows and put in another temp table > INSERT OVERWRITE TABLE converted_symbols_cf_abbr > PARTITION (symbol) > SELECT > original_date, close, symbol > FROM converted_symbols_cf > WHERE original_date != 'Date' > DISTRIBUTE BY symbol; > > set hive.exec.max.dynamic.partitions.pernode=200000; > set hive.exec.max.dynamic.partitions=200000; > > -- Join my date table with my latest stock ticker price table. Use a map > join because the day_seq table only has about 18,000 rows > INSERT OVERWRITE TABLE joined_symbols > PARTITION (symbol) > SELECT /*+ MAPJOIN(day_seq) */ day_seq.original_date, day_seq.current_date, > close, symbol > FROM converted_symbols_cf_abbr > JOIN day_seq ON (converted_symbols_cf_abbr.original_date = > day_seq.original_date) > distribute by symbol; > > CREATE EXTERNAL TABLE symbols_with_avgs > (current_date STRING, close FLOAT) > PARTITIONED BY(symbol STRING) > ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' > STORED AS TEXTFILE > LOCATION '${OUTPUT}/hiveoutput' ; > > -- Group and calculate my averages and output > INSERT OVERWRITE TABLE symbols_with_avgs > PARTITION (symbol) > SELECT current_date, avg(close), symbol > FROM joined_symbols > GROUP BY symbol, current_date > DISTRIBUTE BY symbol; > > > > >