To compute moving averages, you should implement a custom reducer instead
of doing a big join. That will work *much* faster.

Also, Hive already has date_add(). Why did you have to implement your own?
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions

igor
decide.com


On Mon, Jan 23, 2012 at 6:48 PM, Benjamin Poserow <bpose...@gmail.com>wrote:

> I wrote, separately, a Hadoop job to calculate running averages of about
> 2000 stock tickers over a 180 day period as well as a Hive script which
> performs equivalent functionality.  I have been using Amazon Elastic
> MapReduce as my platform for running these jobs.   I have been trying for a
> while to get my Hive script to perform well when spread over many nodes,
> but cannot seem to get the Hive script to perform nearly as well as the
> Hadoop job.  (The Hadoop job takes about an hour to run through all of my
> tickers, whereas the Hive job takes over an hour just to run about 1/8 of
> them and I cannot even seem to get it to finish when I run it for a larger
> number of tickers.)  I also have not seen large gains when running my Hive
> job using a larger number of hosts.   I've been trying to tinker with
> settings, examine the query plans of my queries, attempt many modifications
> of my queries, but have not seen great gains in performance.
>
> Here is my code.  Can you help me identify potential problem points and
> ways I can improve these queries, especially so they distribute well when
> run on multiple hosts.   I tried to add comments where appropriate to make
> it clear what I was doing in each step.  Please note there are about 2000 *
> 180 = 360,000 rows in the raw symbol table.
>
> Please help, I am quite stuck on this!  Feel free to ask any questions for
> which you would like clarification.
>
> Here is my script:
>
> ADD JAR ${INPUT}/market-data.jar ;
> ADD JAR ${INPUT}/HiveJars/derby.jar;
>
> set hive.stats.autogather=false;
>
> set hive.exec.dynamic.partition.mode=nonstrict;
> set hive.exec.dynamic.partition=true;
> set hive.exec.reducers.bytes.per.reducer=1000000000;
> set hive.exec.max.dynamic.partitions.pernode=200000;
> set hive.exec.max.dynamic.partitions=200000;
> set hive.exec.max.created.files=1000000;
>
> -- Note ${INPUT} is the S3 URL to where my scripts and input files are
> stored.  ${INPUT}/hiveinput/output contains separate folders labeled
> symbol=[ticker symbol] so that
> --     they can be imported into a partitioned table.  The files in these
> folders contain the ticker prices of each of the stocks over a 180 day
> period obtained from Yahoo Finance
> CREATE EXTERNAL TABLE raw_symbols
> (dt STRING, open STRING, high STRING, low STRING, close STRING,
>  volume STRING, adj_close STRING)
> PARTITIONED BY (symbol STRING)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
> STORED AS TEXTFILE
> LOCATION '${INPUT}/hiveinput/output' ;
>
> -- Elastic MapReduce requires you to execute this command to create all of
> the dynamic partitions corresponding to the stock tickers
> ALTER TABLE raw_symbols RECOVER PARTITIONS;
>
> -- This is simply loading a table with the sequence 1 through 90.  I
> actually couldn't find anything in Hive to create a simple integer
> sequence.  So this table is loaded with
> --   this sequence
> CREATE EXTERNAL TABLE day_seq_orig
> (day INT)
> ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
> STORED AS TEXTFILE
> LOCATION '${SEQFILE}';
>
> -- A temporary table to contain the distinct list of dates for which we
> have stock prices, should be 180 dates for the 180 days for which we are
> getting info
> CREATE TABLE distinct_dt
> (dt STRING)
> STORED AS SEQUENCEFILE;
>
> -- ${SAMPLE_SYMBOL} is just one of my symbols.  Since the same sequence of
> dates applies to all tickers, this gives me an easy and quick way to get
> the range of dates
> INSERT OVERWRITE TABLE distinct_dt
> SELECT distinct dt
> FROM raw_symbols
> WHERE raw_symbols.symbol = '${SAMPLE_SYMBOL}';
>
> CREATE TABLE day_seq
> (current_date STRING, original_date STRING)
> STORED AS SEQUENCEFILE;
>
> -- We are calculating a 90 day rolling average for each stock ticker
> price, so I want to get those dates within 90 days of each of the dates for
> which I am getting stock ticker prices
> INSERT OVERWRITE TABLE day_seq.  date_add is a custom Hive function I
> implemented; it does just what it implies:  it adds an integral number to a
> date.
> SELECT date_add(dt, day), dt
> FROM distinct_dt
> JOIN day_seq_orig ON (1=1);
>
> CREATE TABLE converted_symbols
> (
> symbol STRING,
> original_date STRING,
> close FLOAT
>  )
> STORED AS SEQUENCEFILE;
>
> CREATE TABLE converted_symbols_cf
> (
> original_date STRING,
> close FLOAT
>  )
> PARTITIONED BY (symbol STRING)
> STORED AS RCFILE;
>
> CREATE TABLE converted_symbols_cf_abbr
> (
> original_date STRING,
> close FLOAT
> )
> PARTITIONED BY (symbol STRING)
> STORED AS RCFILE;
>
> CREATE TABLE dt_partitioned_symbols
> (
> symbol STRING,
> close FLOAT
> )
> PARTITIONED BY (original_date STRING)
> STORED AS SEQUENCEFILE;
>
>
> CREATE TABLE joined_symbols
> (
> original_date STRING,
> current_date STRING, close FLOAT
>  )
> PARTITIONED BY (symbol STRING)
> STORED AS SEQUENCEFILE;
>
> -- Take the raw symbol stock prices, which did no data conversions, and
> convert the price to a float and put into a temp table
> INSERT OVERWRITE TABLE converted_symbols_cf
> PARTITION (symbol)
> SELECT
>        dt, cast(close AS FLOAT), symbol
> FROM raw_symbols
> DISTRIBUTE BY symbol;
>
> -- Remove some erroneous header rows and put in another temp table
> INSERT OVERWRITE TABLE converted_symbols_cf_abbr
> PARTITION (symbol)
> SELECT
>     original_date, close, symbol
> FROM converted_symbols_cf
> WHERE original_date != 'Date'
> DISTRIBUTE BY symbol;
>
> set hive.exec.max.dynamic.partitions.pernode=200000;
> set hive.exec.max.dynamic.partitions=200000;
>
> -- Join my date table with my latest stock ticker price table.  Use a map
> join because the day_seq table only has about 18,000 rows
> INSERT OVERWRITE TABLE joined_symbols
> PARTITION (symbol)
> SELECT /*+ MAPJOIN(day_seq) */ day_seq.original_date, day_seq.current_date,
>        close, symbol
> FROM converted_symbols_cf_abbr
> JOIN day_seq ON (converted_symbols_cf_abbr.original_date =
> day_seq.original_date)
> distribute by symbol;
>
> CREATE EXTERNAL TABLE symbols_with_avgs
> (current_date STRING, close FLOAT)
> PARTITIONED BY(symbol STRING)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
> STORED AS TEXTFILE
> LOCATION '${OUTPUT}/hiveoutput' ;
>
> -- Group and calculate my averages and output
> INSERT OVERWRITE TABLE symbols_with_avgs
> PARTITION (symbol)
> SELECT current_date, avg(close), symbol
> FROM joined_symbols
> GROUP BY symbol, current_date
> DISTRIBUTE BY symbol;
>
>
>
>
>

Reply via email to