Thanks again.

I think I figured out the bug (not sure if it's a bug or whether that's
a known limitation when creating a third-level join) .... we need another
table c to re-create my scenario.

table_a
create table table_a(a_id bigint, common_id bigint, int_a int, int_b int,
int_c int, int_d int, string_a string, total_count bigint) partitioned by
(part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;

table_b
create table table_b(b_id bigint, common_id bigint, int_a int, int_b int,
int_c int, int_d int, string_b string, total_count bigint) partitioned by
(part_col string)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;
table_c
create table table_c(int_c int, string_c string) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;

query
explain select c.string_c, sum(a.total_count), sum(b.total_count) from
table_a a join table_b b on a.common_id = b.common_id and a.int_c = b.int_c
and a.int_d = b.int_d join table_c c on a.int_c = c.int_c where a.part_col
>= "blah1" and b.part_col >= "blah1" group by c.string_c;

If you look at the query plan for table_b you will see the value expression
does not project the column int_c, and if you look at the join operator in
the query plan it show a join operating between 3 columns from table_a and 2
columns from table_b which is not the intention of the query.

I think hive should output all the columns from table_b which are part of
the join conditions and not look to see if the column is going to be
consumed in the later stages.

Do you think I am not writing the hive query in a right way ? the query
would return results as expected in a mysql or sql-server environment.

Thanks,
Viral
On Wed, Jan 19, 2011 at 11:06 AM, Appan Thirumaligai <
athirumali...@ngmoco.com> wrote:

>  EXPLAIN select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id =
> t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group
> by t1.some_string,t2.some_string;
>
> OK
> ABSTRACT SYNTAX TREE:
>   (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF table_a t1) (TOK_TABREF
> table_b t2) (and (= (. (TOK_TABLE_OR_COL t1) part_col) (. (TOK_TABLE_OR_COL
> t2) part_col)) (= (. (TOK_TABLE_OR_COL t1) common_id) (. (TOK_TABLE_OR_COL
> t2) common_id))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
> (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) some_string)) (TOK_SELEXPR
> (. (TOK_TABLE_OR_COL t2) some_string)) (TOK_SELEXPR (TOK_FUNCTION sum (.
> (TOK_TABLE_OR_COL t1) total_count))) (TOK_SELEXPR (TOK_FUNCTION sum (.
> (TOK_TABLE_OR_COL t2) total_count)))) (TOK_WHERE (and (>= (.
> (TOK_TABLE_OR_COL t1) part_col) 'mypart') (>= (. (TOK_TABLE_OR_COL t2)
> part_col) 'mypart'))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL t1) some_string) (.
> (TOK_TABLE_OR_COL t2) some_string))))
>
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-2 depends on stages: Stage-1
>   Stage-0 is a root stage
>
> STAGE PLANS:
>   Stage: Stage-1
>     Map Reduce
>       Alias -> Map Operator Tree:
>         t1
>           TableScan
>             alias: t1
>             Filter Operator
>               predicate:
>                   expr: (part_col >= 'mypart')
>                   type: boolean
>               Reduce Output Operator
>                 key expressions:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 sort order: ++
>                 Map-reduce partition columns:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 tag: 0
>                 value expressions:
>                       expr: some_string
>                       type: string
>                       expr: total_count
>                       type: bigint
>                       expr: part_col
>                       type: string
>         t2
>           TableScan
>             alias: t2
>             Filter Operator
>               predicate:
>                   expr: (part_col >= 'mypart')
>                   type: boolean
>               Reduce Output Operator
>                 key expressions:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 sort order: ++
>                 Map-reduce partition columns:
>                       expr: part_col
>                       type: string
>                       expr: common_id
>                       type: bigint
>                 tag: 1
>                 value expressions:
>                       expr: some_string
>                       type: string
>                       expr: total_count
>                       type: bigint
>                       expr: part_col
>                       type: string
>       Reduce Operator Tree:
>         Join Operator
>           condition map:
>                Inner Join 0 to 1
>           condition expressions:
>             0 {VALUE._col2} {VALUE._col3} {VALUE._col4}
>             1 {VALUE._col2} {VALUE._col3} {VALUE._col4}
>           handleSkewJoin: false
>           outputColumnNames: _col2, _col3, _col4, _col9, _col10, _col11
>           Filter Operator
>             predicate:
>                 expr: ((_col4 >= 'mypart') and (_col11 >= 'mypart'))
>                 type: boolean
>             Select Operator
>               expressions:
>                     expr: _col2
>                     type: string
>                     expr: _col9
>                     type: string
>                     expr: _col3
>                     type: bigint
>                     expr: _col10
>                     type: bigint
>               outputColumnNames: _col2, _col9, _col3, _col10
>               Group By Operator
>                 aggregations:
>                       expr: sum(_col3)
>                       expr: sum(_col10)
>                 bucketGroup: false
>                 keys:
>                       expr: _col2
>                       type: string
>                       expr: _col9
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1, _col2, _col3
>                 File Output Operator
>                   compressed: false
>                   GlobalTableId: 0
>                   table:
>                       input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                       output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>
>   Stage: Stage-2
>     Map Reduce
>       Alias -> Map Operator Tree:
>
> hdfs://localhost:8022/tmp/hive-training/hive_2011-01-19_11-05-34_526_453408324928472657/-mr-10002
>
>             Reduce Output Operator
>               key expressions:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: string
>               sort order: ++
>               Map-reduce partition columns:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: string
>               tag: -1
>               value expressions:
>                     expr: _col2
>                     type: bigint
>                     expr: _col3
>                     type: bigint
>       Reduce Operator Tree:
>         Group By Operator
>           aggregations:
>                 expr: sum(VALUE._col0)
>                 expr: sum(VALUE._col1)
>           bucketGroup: false
>           keys:
>                 expr: KEY._col0
>                 type: string
>                 expr: KEY._col1
>                 type: string
>           mode: mergepartial
>           outputColumnNames: _col0, _col1, _col2, _col3
>           Select Operator
>             expressions:
>                   expr: _col0
>                   type: string
>                   expr: _col1
>                   type: string
>                   expr: _col2
>                   type: bigint
>                   expr: _col3
>                   type: bigint
>             outputColumnNames: _col0, _col1, _col2, _col3
>             File Output Operator
>               compressed: false
>               GlobalTableId: 0
>               table:
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>
>  On Jan 19, 2011, at 10:47 AM, Viral Bajaria wrote:
>
>  Thanks Appan for verifying. I will do some more tests on my side too and
> let you know the results.
>
> I tried a different version of the query where I join'ed two sub-queries
> for the same partitions and the data comes out to be correct.
>
> I will see if I can post the real-world example to the list, because that
> might sound like a more practical example.
>
> If you still have your example(s) do you mind sending me your query-plan
> for
>
> select
> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
> table_a t1 join table_b t2 on t1.part_col = t2.part_col and t1.common_id =
> t2.common_id where t1.part_col >= 'mypart' and t2.part_col >= 'mypart' group
> by t1.some_string,t2.some_string;
>
> -Viral
> On Wed, Jan 19, 2011 at 10:36 AM, Appan Thirumaligai <
> athirumali...@ngmoco.com> wrote:
>
>> Viral,
>>
>> I tried the queries below (similar to yours) and I get the expected
>> results when I do the join. I ran my queries after building hive from the
>> latest source and hadoop 0.20+.
>>  create table table_a(a_id bigint, common_id bigint, some_string
>> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
>> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
>> TEXTFILE;
>> create table table_b(b_id bigint, common_id bigint, some_string
>> string,total_count bigint) partitioned by (part_col string)  ROW FORMAT
>> DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS
>> TEXTFILE;
>> dfs -mkdir /user/data/table_a;
>> dfs -mkdir /user/data/table_b;
>> dfs -put /home/training/hiveug/table_a.csv /user/data/table_a;
>> dfs -put /home/training/hiveug/table_b.csv /user/data/table_b;
>> alter table table_a add partition (part_col = 'mypart') location
>> '/user/data/table_a';
>> alter table table_b add partition (part_col = 'mypart') location
>> '/user/data/table_b';
>> select * from table_a t1 join table_b t2 on t1.part_col == t2.part_col;
>> -->> Returns expected result
>> select
>> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
>> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
>> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_string;
>> --->>Works fine.
>> select
>> t1.some_string,t2.some_string,sum(t1.total_count),sum(t2.total_count) from
>> table_a t1 join table_b t2 on t1.part_col = t2.part_col where t1.part_col >=
>> 'mypart' and t2.part_col >= 'mypart' group by t1.some_string,t2.some_st*
>> from table_a t1 join table_b t2 on t1.part_col = t2.part_col where
>> t1.part_col >= 'mypart' and t2.part_col >= 'mypart';
>> --->Works fine.
>>
>> I created the two files with sample data in them and copied it to hdfs
>>
>> I'll try later on your hive 0.5.0 but looks like there might be something
>> wrong in your query.
>>
>>  On Jan 18, 2011, at 8:40 PM, Ajo Fod wrote:
>>
>>  Can you try this with a dummy table with very few rows ... to see if
>> the reason the script doesn't finish is a computational issue?
>>
>> One other thing is to try with a combined partition, to see if it is a
>> problem with the partitioning.
>>
>> Also, take a look at  the results of an EXPLAIN statement, see if
>> there are any hints there.
>>
>> NOTE: I'm new to hive too.
>>
>> -Ajo
>>
>>
>> On Tue, Jan 18, 2011 at 8:08 PM, Viral Bajaria <viral.baja...@gmail.com>
>> wrote:
>>
>> I haven't heard back from any on the list and am still struggling to join
>>
>> two tables on partitioned column
>>
>>
>> Has anyone every tried joining two tables on a paritioned column and the
>>
>> results are not as expected ?
>>
>> On Tue, Jan 18, 2011 at 2:04 AM, Viral Bajaria <viral.baja...@gmail.com>
>>
>> wrote:
>>
>>
>>  I am facing issues with a query where I am joining two fairly large
>> tables
>>
>>  on the partitioned column along with other common columns. The expected
>>
>>  output is not in line with what I expect it to be. Since the query is
>> very
>>
>>  complex, I will simplify it so that people can provide inputs if they
>> have
>>
>>  faced similar issues or if I am doing something totally wrong.
>>
>>  TABLE A:
>>
>>  a_id bigint
>>
>>  common_id bigint
>>
>>  some_string string
>>
>>  total_count bigint
>>
>>  part_col string <---- this is the partitioned column
>>
>>  TABLE B:
>>
>>  b_int bigint
>>
>>  common_id bigint
>>
>>  some_string string
>>
>>  total_sum bigint
>>
>>  part_col string <---- this is the partitioned column
>>
>>  now the query is as follows:
>>
>>  SELECT /*+ STREAMTABLE(A,B) */ A.some_string, B.some_string,
>>
>>  sum(A.total_count), sum(B.total_sum) from A JOIN B ON (t1.part_col =
>>
>>  t2.part_col AND t1.common_id = t2.common_id) WHERE t1.part_col >= 'val1'
>> AND
>>
>>  t2.part_col >= 'val1' GROUP BY A.some_string, B.some_string
>>
>>  Does HIVE not like to join on the partitioned columns ? because when i
>>
>>  create a join on just the partitioned column the reduce step never
>> finishes.
>>
>>  I am using HIVE 0.5.0
>>
>>  Thanks,
>>
>>  Viral
>>
>>
>>
>>    Appan Thirumaligai
>> ap...@ngmoco.com
>> Ph:1-818-472-8427
>> ngmoco:)
>>
>>
>
>    Appan Thirumaligai
> ap...@ngmoco.com
> Ph:1-818-472-8427
> ngmoco:)
>
>

Reply via email to