[ 
https://issues.apache.org/jira/browse/HIVE-28223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaojk updated HIVE-28223:
--------------------------
    Attachment: sql_query.txt

> Hive on tez: Association of large and small tables satisfies four conditions 
> simultaneously, resulting in data loss: 1. Close mapjoin, 2. Association has 
> group by, 3. Association has windowing function, and 4. Association condition 
> is int/bigint
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-28223
>                 URL: https://issues.apache.org/jira/browse/HIVE-28223
>             Project: Hive
>          Issue Type: Bug
>    Affects Versions: 3.1.0
>            Reporter: zhaojk
>            Priority: Major
>         Attachments: sql_query.txt
>
>
> When the following four conditions are met during the SQL runtime, the result 
> may be abnormal. The first condition is that there is no mapjoin association. 
> The second condition is that there is a group by in the first paragraph of 
> SQL logic. The third condition is that there is a row number over partition 
> windowing function in the second paragraph of SQL. The fourth condition is 
> that the association condition is of type int or bigint. When these four 
> conditions are met simultaneously, the result will be missing data.
> If one section of SQL is changed to a subtable, the result is correct 
> regardless of whether mapjoin is enabled or not, and converting the 
> association condition to a string result is also correct. Detailed SQL and 
> execution plan
> 大表和小表的SQL运行过程中,满足以下四个条件的时候导致结果异常,第一点没有走mapjoin关联,第二点第一段SQL逻辑中有group 
> by,第三点第二段SQL中有row number over 
> partition的开窗函数,第四点关联条件为int或者bigint类型,同时满足这四个条件的时候结果就会缺失数据。
> 如果将其中一段sql改为子表,不管是否开启mapjoin结果都是正确的,将关联条件转换为string结果也是正确的,详细SQL及执行计划
> hive 3.1.0  TEZ0.9.1
> 关闭mapjoin,结果异常
> Close mapjoin, result is abnormal
> set hive.auto.convert.join=false;
> select
> count(1)
> from
>   (
>     select
>       user_id
>     from
>       odb.test0000   --10524428
>     where
>       etl_dt = '20240329'
>     group by
>       user_id
>   ) a
>   JOIN (
>     select
>       user_id,
>       points_balance,
>       row_number() over(
>         partition by user_id
>         order by
>           points_balance desc
>       ) as rank_no
>     from
>       odb.test1111  --1800617
>     where
>       etl_dt = '20240329'
>       and points_balance <> 0
>   ) c on a.user_id = c.user_id
>   where  c.rank_no = 1;
>   
> +--------+
> |  _c0   |
> +--------+
> | 67594  |
> +--------+
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) |
> | Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)        |
> |                                                    |
> | Stage-0                                            |
> |   Fetch Operator                                   |
> |     limit:-1                                       |
> |     Stage-1                                        |
> |       Reducer 5 vectorized                         |
> |       File Output Operator [FS_43]                 |
> |         Group By Operator [GBY_42] (rows=1 width=8) |
> |           Output:["_col0"],aggregations:["count(VALUE._col0)"] |
> |         <-Reducer 4 [CUSTOM_SIMPLE_EDGE]           |
> |           PARTITION_ONLY_SHUFFLE [RS_21]           |
> |             Group By Operator [GBY_20] (rows=1 width=8) |
> |               Output:["_col0"],aggregations:["count()"] |
> |               Merge Join Operator [MERGEJOIN_35] (rows=1774811 width=2359) |
> |                 Conds:GBY_5._col0=SEL_12._col0(Inner) |
> |               <-Group By Operator [GBY_5] (rows=1613465 width=2359) |
> |                   Output:["_col0"],keys:KEY._col0  |
> |               <-Select Operator [SEL_12] (rows=900308 width=8) |
> |                   Output:["_col0"]                 |
> |                   Filter Operator [FIL_28] (rows=900308 width=572) |
> |                     predicate:(row_number_window_0 = 1) |
> |                     PTF Operator [PTF_11] (rows=1800617 width=572) |
> |                       Function 
> definitions:[{},{"name:":"windowingtablefunction","order by:":"_col4 DESC 
> NULLS LAST","partition by:":"_col2"}] |
> |                       Select Operator [SEL_10] (rows=1800617 width=572) |
> |                         Output:["_col2","_col4"]   |
> |                       <-Map 1 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_39]            |
> |                           PartitionCols:_col0      |
> |                           Group By Operator [GBY_38] (rows=3226931 
> width=2359) |
> |                             Output:["_col0"],keys:user_id |
> |                             Filter Operator [FIL_37] (rows=3226931 
> width=2359) |
> |                               predicate:user_id is not null |
> |                               TableScan [TS_0] (rows=3396769 width=2359) |
> |                                 
> odb@test0000,test0000,Tbl:PARTIAL,Col:NONE,Output:["user_id"] |
> |                       <-Map 3 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_41]            |
> |                           PartitionCols:user_id    |
> |                           Filter Operator [FIL_40] (rows=1800617 width=120) 
> |
> |                             predicate ((points_balance <> 0) and user_id is 
> not null) |
> |                             TableScan [TS_7] (rows=1800617 width=120) |
> |                               
> odb@test1111,test1111,Tbl:COMPLETE,Col:COMPLETE,Output:["user_id","points_balance"]
>  |
> |                                                    |
> +----------------------------------------------------+
> 开启mapjoin,结果正确
> Enable mapjoin, the result is correct
> set hive.auto.convert.join=true;
> select
> count(1)
> from
>   (
>     select
>       user_id
>     from
>       odb.test0000   --10524428
>     where
>       etl_dt = '20240329'
>     group by
>       user_id
>   ) a
>   JOIN (
>     select
>       user_id,
>       points_balance,
>       row_number() over(
>         partition by user_id
>         order by
>           points_balance desc
>       ) as rank_no
>     from
>       odb.test1111  --1800617
>     where
>       etl_dt = '20240329'
>       and points_balance <> 0
>   ) c on a.user_id = c.user_id
>   where  c.rank_no = 1;
>   
> +---------+
> |   _c0   |
> +---------+
> | 135417  |
> +---------+
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE) |
> | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
> | Reducer 5 <- Map 4 (SIMPLE_EDGE)                   |
> |                                                    |
> | Stage-0                                            |
> |   Fetch Operator                                   |
> |     limit:-1                                       |
> |     Stage-1                                        |
> |       Reducer 3 vectorized                         |
> |       File Output Operator [FS_52]                 |
> |         Group By Operator [GBY_51] (rows=1 width=8) |
> |           Output:["_col0"],aggregations:["count(VALUE._col0)"] |
> |         <-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized |
> |           PARTITION_ONLY_SHUFFLE [RS_50]           |
> |             Group By Operator [GBY_49] (rows=1 width=8) |
> |               Output:["_col0"],aggregations:["count()"] |
> |               Map Join Operator [MAPJOIN_48] (rows=1774811 width=2359) |
> |                 Conds:GBY_47._col0=RS_46._col0(Inner) |
> |               <-Reducer 5 [BROADCAST_EDGE] vectorized |
> |                 BROADCAST [RS_46]                  |
> |                   PartitionCols:_col0              |
> |                   Select Operator [SEL_45] (rows=900308 width=8) |
> |                     Output:["_col0"]               |
> |                     Filter Operator [FIL_44] (rows=900308 width=572) |
> |                       predicate:(row_number_window_0 = 1) |
> |                       PTF Operator [PTF_43] (rows=1800617 width=572) |
> |                         Function 
> definitions:[{},{"name:":"windowingtablefunction","order by:":"_col4 DESC 
> NULLS LAST","partition by:":"_col2"}] |
> |                         Select Operator [SEL_42] (rows=1800617 width=572) |
> |                           Output:["_col2","_col4"] |
> |                         <-Map 4 [SIMPLE_EDGE] vectorized |
> |                           SHUFFLE [RS_41]          |
> |                             PartitionCols:user_id  |
> |                             Filter Operator [FIL_40] (rows=1800617 
> width=120) |
> |                               predicate:((points_balance <> 0) and user_id 
> is not null) |
> |                               TableScan [TS_7] (rows=1800617 width=120) |
> |                                 
> odb@test1111,test1111,Tbl:COMPLETE,Col:COMPLETE,Output:["user_id","points_balance"]
>  |
> |               <-Group By Operator [GBY_47] (rows=1613465 width=2359) |
> |                   Output:["_col0"],keys:KEY._col0  |
> |                 <-Map 1 [SIMPLE_EDGE] vectorized   |
> |                   SHUFFLE [RS_39]                  |
> |                     PartitionCols:_col0            |
> |                     Group By Operator [GBY_38] (rows=3226931 width=2359) |
> |                       Output:["_col0"],keys:user_id |
> |                       Filter Operator [FIL_37] (rows=3226931 width=2359) |
> |                         predicate:user_id is not null |
> |                         TableScan [TS_0] (rows=3396769 width=2359) |
> |                           
> odb@test0000,test0000,Tbl:PARTIAL,Col:NONE,Output:["user_id"] |
> |                                                    |
> +----------------------------------------------------+
> --关联条件cast为string,结果正确
> The association condition cast is string, and the result is correct
> set hive.auto.convert.join=false;
> select
> count(1)
> from
>   (
>     select
>       user_id
>     from
>       odb.test0000   --10524428
>     where
>       etl_dt = '20240329'
>     group by
>       user_id
>   ) a
>   JOIN (
>     select
>       user_id,
>       points_balance,
>       row_number() over(
>         partition by user_id
>         order by
>           points_balance desc
>       ) as rank_no
>     from
>       odb.test1111  --1800617
>     where
>       etl_dt = '20240329'
>       and points_balance <> 0
>   ) c on cast(a.user_id as string) = cast(c.user_id as string)
>   where  c.rank_no = 1;
> +---------+
> |   _c0   |
> +---------+
> | 135417  |
> +---------+
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 2 <- Map 1 (SIMPLE_EDGE)                   |
> | Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE) |
> | Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)        |
> | Reducer 6 <- Map 5 (SIMPLE_EDGE)                   |
> |                                                    |
> | Stage-0                                            |
> |   Fetch Operator                                   |
> |     limit:-1                                       |
> |     Stage-1                                        |
> |       Reducer 4 vectorized                         |
> |       File Output Operator [FS_49]                 |
> |         Group By Operator [GBY_48] (rows=1 width=8) |
> |           Output:["_col0"],aggregations:["count(VALUE._col0)"] |
> |         <-Reducer 3 [CUSTOM_SIMPLE_EDGE]           |
> |           PARTITION_ONLY_SHUFFLE [RS_21]           |
> |             Group By Operator [GBY_20] (rows=1 width=8) |
> |               Output:["_col0"],aggregations:["count()"] |
> |               Merge Join Operator [MERGEJOIN_35] (rows=1774811 width=2359) |
> |                 Conds:RS_40.CAST( _col0 AS STRING)=RS_47.CAST( _col0 AS 
> STRING)(Inner) |
> |               <-Reducer 2 [SIMPLE_EDGE] vectorized |
> |                 SHUFFLE [RS_40]                    |
> |                   PartitionCols:CAST( _col0 AS STRING) |
> |                   Group By Operator [GBY_39] (rows=1613465 width=2359) |
> |                     Output:["_col0"],keys:KEY._col0 |
> |                   <-Map 1 [SIMPLE_EDGE] vectorized |
> |                     SHUFFLE [RS_38]                |
> |                       PartitionCols:_col0          |
> |                       Group By Operator [GBY_37] (rows=3226931 width=2359) |
> |                         Output:["_col0"],keys:user_id |
> |                         Filter Operator [FIL_36] (rows=3226931 width=2359) |
> |                           predicate:user_id is not null |
> |                           TableScan [TS_0] (rows=3396769 width=2359) |
> |                             
> odb@test0000,test0000,Tbl:PARTIAL,Col:NONE,Output:["user_id"] |
> |               <-Reducer 6 [SIMPLE_EDGE] vectorized |
> |                 SHUFFLE [RS_47]                    |
> |                   PartitionCols:CAST( _col0 AS STRING) |
> |                   Select Operator [SEL_46] (rows=900308 width=8) |
> |                     Output:["_col0"]               |
> |                     Filter Operator [FIL_45] (rows=900308 width=572) |
> |                       predicate:(row_number_window_0 = 1) |
> |                       PTF Operator [PTF_44] (rows=1800617 width=572) |
> |                         Function 
> definitions:[{},{"name:":"windowingtablefunction","order by:":"_col4 DESC 
> NULLS LAST","partition by:":"_col2"}] |
> |                         Select Operator [SEL_43] (rows=1800617 width=572) |
> |                           Output:["_col2","_col4"] |
> |                         <-Map 5 [SIMPLE_EDGE] vectorized |
> |                           SHUFFLE [RS_42]          |
> |                             PartitionCols:user_id  |
> |                             Filter Operator [FIL_41] (rows=1800617 
> width=120) |
> |                               predicate:((points_balance <> 0) and user_id 
> is not null) |
> |                               TableScan [TS_7] (rows=1800617 width=120) |
> |                                 
> odb@test1111,test1111,Tbl:COMPLETE,Col:COMPLETE,Output:["user_id","points_balance"]
>  |
> |                                                    |
> +----------------------------------------------------+
> ======
> 将关联表转化为子表,不用group by,结果也是正确的
> Convert the associated table to a subtable without using group by, and the 
> result is also correct
> set hive.auto.convert.join=false;
> select
> count(1)
> from
>   (
>     select
>       user_id
>     from
>       tmp_dev.test11
>   ) a
>   JOIN (
>         select
>       user_id,
>       points_balance,
>       row_number() over(
>         partition by user_id
>         order by
>           points_balance desc
>       ) as rank_no
>     from
>       odb.test1111  --1800617
>     where
>       etl_dt = '20240329'
>       and points_balance <> 0
>   ) c on a.user_id = c.user_id;
>   where  c.rank_no = 1
>   
> +---------+
> |   _c0   |
> +---------+
> | 135417  |
> +---------+
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
> | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
> |                                                    |
> | Stage-0                                            |
> |   Fetch Operator                                   |
> |     limit:-1                                       |
> |     Stage-1                                        |
> |       Reducer 3 vectorized                         |
> |       File Output Operator [FS_37]                 |
> |         Group By Operator [GBY_36] (rows=1 width=8) |
> |           Output:["_col0"],aggregations:["count(VALUE._col0)"] |
> |         <-Reducer 2 [CUSTOM_SIMPLE_EDGE]           |
> |           PARTITION_ONLY_SHUFFLE [RS_11]           |
> |             Group By Operator [GBY_10] (rows=1 width=8) |
> |               Output:["_col0"],aggregations:["count()"] |
> |               Merge Join Operator [MERGEJOIN_29] (rows=170303 width=8) |
> |                 Conds:RS_32._col0=RS_35._col0(Inner) |
> |               <-Map 1 [SIMPLE_EDGE] vectorized     |
> |                 SHUFFLE [RS_32]                    |
> |                   PartitionCols:_col0              |
> |                   Select Operator [SEL_31] (rows=170303 width=8) |
> |                     Output:["_col0"]               |
> |                     Filter Operator [FIL_30] (rows=170303 width=8) |
> |                       predicate:user_id is not null |
> |                       TableScan [TS_0] (rows=170303 width=8) |
> |                         
> tmp_dev@test11,test11,Tbl:COMPLETE,Col:COMPLETE,Output:["user_id"] |
> |               <-Map 4 [SIMPLE_EDGE] vectorized     |
> |                 SHUFFLE [RS_35]                    |
> |                   PartitionCols:_col0              |
> |                   Select Operator [SEL_34] (rows=1800617 width=8) |
> |                     Output:["_col0"]               |
> |                     Filter Operator [FIL_33] (rows=1800617 width=120) |
> |                       predicate((points_balance <> 0) and user_id is not 
> null) |
> |                       TableScan [TS_3] (rows=1800617 width=120) |
> |                         
> odb@test1111,test1111,Tbl:COMPLETE,Col:COMPLETE,Output:["user_id","points_balance"]
>  |
> |                                                    |
> +----------------------------------------------------+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to