Hi Benjamin,

In case you are interested SQL 
Windowing(https://github.com/hbutani/SQLWindowing) is designed for these kinds 
of use cases. 
Your query would be expressed as:

from < select symbol, dt, cast(close AS FLOAT) as close 
       from raw_symbols
     >
partition by symbol
order by dt
with 
    avg(close) over rows between unbounded preceding and current row as 
rollingavg
select symbol, dt, rollingavg

It is along the lines of Aster's MR table functions, so you can specify things 
like Time Series Analysis, Basket Analysis as a query instead of having to 
write custom Jobs or long scripts of SQL. It's in alpha state; I am looking for 
users to work with.

Regards,
Harish.

From: Igor Tatarinov [mailto:i...@decide.com] 
Sent: Monday, January 23, 2012 11:27 PM
To: user@hive.apache.org
Subject: Re: Performance problems with Hive script

To compute moving averages, you should implement a custom reducer instead of 
doing a big join. That will work *much* faster.

Also, Hive already has date_add(). Why did you have to implement your own?
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions

igor
decide.com

On Mon, Jan 23, 2012 at 6:48 PM, Benjamin Poserow <bpose...@gmail.com> wrote:
I wrote, separately, a Hadoop job to calculate running averages of about 2000 
stock tickers over a 180 day period as well as a Hive script which performs 
equivalent functionality.  I have been using Amazon Elastic MapReduce as my 
platform for running these jobs.   I have been trying for a while to get my 
Hive script to perform well when spread over many nodes, but cannot seem to get 
the Hive script to perform nearly as well as the Hadoop job.  (The Hadoop job 
takes about an hour to run through all of my tickers, whereas the Hive job 
takes over an hour just to run about 1/8 of them and I cannot even seem to get 
it to finish when I run it for a larger number of tickers.)  I also have not 
seen large gains when running my Hive job using a larger number of hosts.   
I've been trying to tinker with settings, examine the query plans of my 
queries, attempt many modifications of my queries, but have not seen great 
gains in performance.

Here is my code.  Can you help me identify potential problem points and ways I 
can improve these queries, especially so they distribute well when run on 
multiple hosts.   I tried to add comments where appropriate to make it clear 
what I was doing in each step.  Please note there are about 2000 * 180 = 
360,000 rows in the raw symbol table.

Please help, I am quite stuck on this!  Feel free to ask any questions for 
which you would like clarification.

Here is my script:

ADD JAR ${INPUT}/market-data.jar ;
ADD JAR ${INPUT}/HiveJars/derby.jar;

set hive.stats.autogather=false;

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.reducers.bytes.per.reducer=1000000000;
set hive.exec.max.dynamic.partitions.pernode=200000;
set hive.exec.max.dynamic.partitions=200000;
set hive.exec.max.created.files=1000000;

-- Note ${INPUT} is the S3 URL to where my scripts and input files are stored.  
${INPUT}/hiveinput/output contains separate folders labeled symbol=[ticker 
symbol] so that
--     they can be imported into a partitioned table.  The files in these 
folders contain the ticker prices of each of the stocks over a 180 day period 
obtained from Yahoo Finance
CREATE EXTERNAL TABLE raw_symbols
(dt STRING, open STRING, high STRING, low STRING, close STRING,
 volume STRING, adj_close STRING)
PARTITIONED BY (symbol STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '${INPUT}/hiveinput/output' ;

-- Elastic MapReduce requires you to execute this command to create all of the 
dynamic partitions corresponding to the stock tickers
ALTER TABLE raw_symbols RECOVER PARTITIONS;

-- This is simply loading a table with the sequence 1 through 90.  I actually 
couldn't find anything in Hive to create a simple integer sequence.  So this 
table is loaded with
--   this sequence
CREATE EXTERNAL TABLE day_seq_orig
(day INT)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '${SEQFILE}';

-- A temporary table to contain the distinct list of dates for which we have 
stock prices, should be 180 dates for the 180 days for which we are getting info
CREATE TABLE distinct_dt
(dt STRING)
STORED AS SEQUENCEFILE;

-- ${SAMPLE_SYMBOL} is just one of my symbols.  Since the same sequence of 
dates applies to all tickers, this gives me an easy and quick way to get the 
range of dates
INSERT OVERWRITE TABLE distinct_dt
SELECT distinct dt
FROM raw_symbols
WHERE raw_symbols.symbol = '${SAMPLE_SYMBOL}';

CREATE TABLE day_seq
(current_date STRING, original_date STRING)
STORED AS SEQUENCEFILE;

-- We are calculating a 90 day rolling average for each stock ticker price, so 
I want to get those dates within 90 days of each of the dates for which I am 
getting stock ticker prices
INSERT OVERWRITE TABLE day_seq.  date_add is a custom Hive function I 
implemented; it does just what it implies:  it adds an integral number to a 
date.
SELECT date_add(dt, day), dt
FROM distinct_dt
JOIN day_seq_orig ON (1=1);

CREATE TABLE converted_symbols
(
symbol STRING,
original_date STRING,
close FLOAT
 )
STORED AS SEQUENCEFILE;

CREATE TABLE converted_symbols_cf
(
original_date STRING,
close FLOAT
 )
PARTITIONED BY (symbol STRING)
STORED AS RCFILE;

CREATE TABLE converted_symbols_cf_abbr
(
original_date STRING,
close FLOAT
)
PARTITIONED BY (symbol STRING)
STORED AS RCFILE;

CREATE TABLE dt_partitioned_symbols
(
symbol STRING,
close FLOAT
)
PARTITIONED BY (original_date STRING)
STORED AS SEQUENCEFILE;


CREATE TABLE joined_symbols
(
original_date STRING,
current_date STRING, close FLOAT
 )
PARTITIONED BY (symbol STRING)
STORED AS SEQUENCEFILE;

-- Take the raw symbol stock prices, which did no data conversions, and convert 
the price to a float and put into a temp table
INSERT OVERWRITE TABLE converted_symbols_cf
PARTITION (symbol)
SELECT
       dt, cast(close AS FLOAT), symbol
FROM raw_symbols
DISTRIBUTE BY symbol;

-- Remove some erroneous header rows and put in another temp table
INSERT OVERWRITE TABLE converted_symbols_cf_abbr
PARTITION (symbol)
SELECT
    original_date, close, symbol
FROM converted_symbols_cf
WHERE original_date != 'Date'
DISTRIBUTE BY symbol;

set hive.exec.max.dynamic.partitions.pernode=200000;
set hive.exec.max.dynamic.partitions=200000;

-- Join my date table with my latest stock ticker price table.  Use a map join 
because the day_seq table only has about 18,000 rows
INSERT OVERWRITE TABLE joined_symbols
PARTITION (symbol)
SELECT /*+ MAPJOIN(day_seq) */ day_seq.original_date, day_seq.current_date,
       close, symbol
FROM converted_symbols_cf_abbr
JOIN day_seq ON (converted_symbols_cf_abbr.original_date = 
day_seq.original_date)
distribute by symbol;

CREATE EXTERNAL TABLE symbols_with_avgs
(current_date STRING, close FLOAT)
PARTITIONED BY(symbol STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '${OUTPUT}/hiveoutput' ;

-- Group and calculate my averages and output
INSERT OVERWRITE TABLE symbols_with_avgs
PARTITION (symbol)
SELECT current_date, avg(close), symbol
FROM joined_symbols
GROUP BY symbol, current_date
DISTRIBUTE BY symbol;




Reply via email to