[ 
https://issues.apache.org/jira/browse/HIVE-28620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated HIVE-28620:
--------------------------------
    Description: 
HIVE-25907 fixed a case when an insert overwrite directory didn't write the 
data to the final path when the query results cache was enabled. The solution 
was implemented by checking the QB.parseInfo at a certain point in the semantic 
analysis and writing/reading a flag to a QB instance. The problem is that 
depending on the query (which produces the data to be inserted) can be of 
different complexity, and only took care of queries like:
{code}
INSERT OVERWRITE DIRECTORY "<destination directory>" SELECT * FROM iowd;
{code}
but for queries like this:
{code}
 EXPLAIN EXTENDED INSERT OVERWRITE DIRECTORY '/tmp'  select a13.CATEGORY_ID 
CATEGORY_ID, max(a14.CATEGORY_DESC) CATEGORY_DESC,a12.SUBCAT_ID  
SUBCAT_ID,max(a13.SUBCAT_LONG_DESC)  SUBCAT_DESC,avg((a11.QTY_SOLD * 
(a11.UNIT_PRICE - a11.DISCOUNT)))  WJXBFS1,sum((a11.QTY_SOLD * a11.UNIT_COST)) 
WJXBFS2 from ORDER_DETAIL a11 join LU_ITEM a12 on  (a11.ITEM_ID = a12.ITEM_ID) 
join LU_SUBCATEG a13 on  (a12.SUBCAT_ID = a13.SUBCAT_ID) join LU_CATEGORY a14 
on  (a13.CATEGORY_ID = a14.CATEGORY_ID) group by a13.CATEGORY_ID, a12.SUBCAT_ID;
{code}
it doesn't work. The root cause is that the flag is set in [one QB 
instance|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L1836],
 but during the plan generation, a genPlan is recursively called, moreover, 
this.qb doesn't refers to the main/root QB, as 
[setQB|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12435]
 is called at every step, so when iterating and reaching the file sink 
generation plan, the QB instance passed is not the same as the one on which the 
isInsertOverwriteDirectory flag was set.

I propose to pass the root QB along with the current subquery's QB for further 
usage so file sink generation can decide whether to use the query result cache 
by using the root QB. This solution doesn't need to "clone" the flag value into 
the sub QBs.

When this issue is present, an EXPLAIN EXTENDED can reveal the strange strategy 
of a query when result cache is enabled, here is a snippet:
{code}
|         Reducer 5                                  |
...
|                   File Output Operator             |
|                     bucketingVersion: 2            |
|                     compressed: false              |
|                     GlobalTableId: 0               |
|                     directory: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
 |
|                     NumFilesPerFileSink: 1         |
|                     Statistics: Num rows: 3892258870 Data size: 46678382057 
Basic stats: COMPLETE Column stats: NONE |
|                     Stats Publishing Key Prefix: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000/
 |
|                     table:                         |
|                         input format: 
org.apache.hadoop.mapred.TextInputFormat |
|                         output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         properties:                |
|                           bucketing_version -1     |
|                           columns _col0,_col1,_col2,_col3,_col4,_col5 |
|                           columns.types int:string:int:string:double:double |
|                           serialization.format 1   |
|                           serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                         serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                     TotalFiles: 1                  |
|                     GatherStats: false             |
|                     MultiFileSpray: false          |
|                                                    |
|   Stage: Stage-2                                   |
|     Dependency Collection                          |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           source: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
 |
|           destination: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d
 |
|                                                    |
+----------------------------------------------------+
{code}
as in compile-time the IOWD query was considered cacheable, the 
FileSinkOperator ended up writing to the query result cache, then the MoveTask 
moved to the final path, which is also in the query cache, instead of the 
expected folder '/tmp' in this example

so not only does the output hit the wrong location, but all the IOWD data ends 
up placed and moved on the query result cache's filesystem (which is revealed 
as a serious performance regression on cloud storage when hitting some 
throughput limits).


whatever the solution is, it needs to bypass the query result cache for IOWD, 
so the the original behavior is restored and files go to the right place 
{code}
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           source: 
s3a://somebucket/user/hive/.hive-staging_hive_2024-11-09_16-28-27_281_5911898841946485672-9/-ext-10000
 |
|           destination:  s3a://somebucket/tmp|
|                                                    |
+----------------------------------------------------+
{code}

  was:
HIVE-25907 fixed a case when an insert overwrite directory didn't write the 
data to the final path when the query results cache was enabled. The solution 
was implemented by checking the QB.parseInfo at a certain point in the semantic 
analysis and writing/reading a flag to an instance. The problem is that 
depending on the query (which produces the data to be inserted) can be of 
different complexity, and only took care of queries like:
{code}
INSERT OVERWRITE DIRECTORY "<destination directory>" SELECT * FROM iowd;
{code}
but for queries like this:
{code}
 EXPLAIN EXTENDED INSERT OVERWRITE DIRECTORY '/tmp'  select a13.CATEGORY_ID 
CATEGORY_ID, max(a14.CATEGORY_DESC) CATEGORY_DESC,a12.SUBCAT_ID  
SUBCAT_ID,max(a13.SUBCAT_LONG_DESC)  SUBCAT_DESC,avg((a11.QTY_SOLD * 
(a11.UNIT_PRICE - a11.DISCOUNT)))  WJXBFS1,sum((a11.QTY_SOLD * a11.UNIT_COST)) 
WJXBFS2 from ORDER_DETAIL a11 join LU_ITEM a12 on  (a11.ITEM_ID = a12.ITEM_ID) 
join LU_SUBCATEG a13 on  (a12.SUBCAT_ID = a13.SUBCAT_ID) join LU_CATEGORY a14 
on  (a13.CATEGORY_ID = a14.CATEGORY_ID) group by a13.CATEGORY_ID, a12.SUBCAT_ID;
{code}
it doesn't work. The root cause is that the flag is set in [one QB 
instance|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L1836],
 but during the plan generation, a genPlan is recursively called, moreover, 
this.qb doesn't refers to the main/root QB, as 
[setQB|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12435]
 is called at every step, so when iterating and reaching the file sink 
generation plan, the QB instance passed is not the same as the one on which the 
isInsertOverwriteDirectory flag was set.

I propose to pass the root QB along with the current subquery's QB for further 
usage so file sink generation can decide whether to use the query result cache 
by using the root QB. This solution doesn't need to "clone" the flag value into 
the sub QBs.

When this issue is present, an EXPLAIN EXTENDED can reveal the strange strategy 
of a query when result cache is enabled, here is a snippet:
{code}
|         Reducer 5                                  |
...
|                   File Output Operator             |
|                     bucketingVersion: 2            |
|                     compressed: false              |
|                     GlobalTableId: 0               |
|                     directory: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
 |
|                     NumFilesPerFileSink: 1         |
|                     Statistics: Num rows: 3892258870 Data size: 46678382057 
Basic stats: COMPLETE Column stats: NONE |
|                     Stats Publishing Key Prefix: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000/
 |
|                     table:                         |
|                         input format: 
org.apache.hadoop.mapred.TextInputFormat |
|                         output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         properties:                |
|                           bucketing_version -1     |
|                           columns _col0,_col1,_col2,_col3,_col4,_col5 |
|                           columns.types int:string:int:string:double:double |
|                           serialization.format 1   |
|                           serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                         serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                     TotalFiles: 1                  |
|                     GatherStats: false             |
|                     MultiFileSpray: false          |
|                                                    |
|   Stage: Stage-2                                   |
|     Dependency Collection                          |
|                                                    |
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           source: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
 |
|           destination: 
file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d
 |
|                                                    |
+----------------------------------------------------+
{code}
as in compile-time the IOWD query was considered cacheable, the 
FileSinkOperator ended up writing to the query result cache, then the MoveTask 
moved to the final path, which is also in the query cache, instead of the 
expected folder '/tmp' in this example

so not only does the output hit the wrong location, but all the IOWD data ends 
up placed and moved on the query result cache's filesystem (which is revealed 
as a serious performance regression on cloud storage when hitting some 
throughput limits).


whatever the solution is, it needs to bypass the query result cache for IOWD, 
so the the original behavior is restored and files go to the right place 
{code}
|   Stage: Stage-0                                   |
|     Move Operator                                  |
|       files:                                       |
|           hdfs directory: true                     |
|           source: 
s3a://somebucket/user/hive/.hive-staging_hive_2024-11-09_16-28-27_281_5911898841946485672-9/-ext-10000
 |
|           destination:  s3a://somebucket/tmp|
|                                                    |
+----------------------------------------------------+
{code}


> Query result is cached in case of IOWD if the subquery is not trivial
> ---------------------------------------------------------------------
>
>                 Key: HIVE-28620
>                 URL: https://issues.apache.org/jira/browse/HIVE-28620
>             Project: Hive
>          Issue Type: Bug
>      Security Level: Public(Viewable by anyone) 
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>
> HIVE-25907 fixed a case when an insert overwrite directory didn't write the 
> data to the final path when the query results cache was enabled. The solution 
> was implemented by checking the QB.parseInfo at a certain point in the 
> semantic analysis and writing/reading a flag to a QB instance. The problem is 
> that depending on the query (which produces the data to be inserted) can be 
> of different complexity, and only took care of queries like:
> {code}
> INSERT OVERWRITE DIRECTORY "<destination directory>" SELECT * FROM iowd;
> {code}
> but for queries like this:
> {code}
>  EXPLAIN EXTENDED INSERT OVERWRITE DIRECTORY '/tmp'  select a13.CATEGORY_ID 
> CATEGORY_ID, max(a14.CATEGORY_DESC) CATEGORY_DESC,a12.SUBCAT_ID  
> SUBCAT_ID,max(a13.SUBCAT_LONG_DESC)  SUBCAT_DESC,avg((a11.QTY_SOLD * 
> (a11.UNIT_PRICE - a11.DISCOUNT)))  WJXBFS1,sum((a11.QTY_SOLD * 
> a11.UNIT_COST)) WJXBFS2 from ORDER_DETAIL a11 join LU_ITEM a12 on  
> (a11.ITEM_ID = a12.ITEM_ID) join LU_SUBCATEG a13 on  (a12.SUBCAT_ID = 
> a13.SUBCAT_ID) join LU_CATEGORY a14 on  (a13.CATEGORY_ID = a14.CATEGORY_ID) 
> group by a13.CATEGORY_ID, a12.SUBCAT_ID;
> {code}
> it doesn't work. The root cause is that the flag is set in [one QB 
> instance|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L1836],
>  but during the plan generation, a genPlan is recursively called, moreover, 
> this.qb doesn't refers to the main/root QB, as 
> [setQB|https://github.com/apache/hive/blob/883d5dfe25929ba5dcac635e752adf5561d28402/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12435]
>  is called at every step, so when iterating and reaching the file sink 
> generation plan, the QB instance passed is not the same as the one on which 
> the isInsertOverwriteDirectory flag was set.
> I propose to pass the root QB along with the current subquery's QB for 
> further usage so file sink generation can decide whether to use the query 
> result cache by using the root QB. This solution doesn't need to "clone" the 
> flag value into the sub QBs.
> When this issue is present, an EXPLAIN EXTENDED can reveal the strange 
> strategy of a query when result cache is enabled, here is a snippet:
> {code}
> |         Reducer 5                                  |
> ...
> |                   File Output Operator             |
> |                     bucketingVersion: 2            |
> |                     compressed: false              |
> |                     GlobalTableId: 0               |
> |                     directory: 
> file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
>  |
> |                     NumFilesPerFileSink: 1         |
> |                     Statistics: Num rows: 3892258870 Data size: 46678382057 
> Basic stats: COMPLETE Column stats: NONE |
> |                     Stats Publishing Key Prefix: 
> file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000/
>  |
> |                     table:                         |
> |                         input format: 
> org.apache.hadoop.mapred.TextInputFormat |
> |                         output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
> |                         properties:                |
> |                           bucketing_version -1     |
> |                           columns _col0,_col1,_col2,_col3,_col4,_col5 |
> |                           columns.types int:string:int:string:double:double 
> |
> |                           serialization.format 1   |
> |                           serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
> |                         serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
> |                     TotalFiles: 1                  |
> |                     GatherStats: false             |
> |                     MultiFileSpray: false          |
> |                                                    |
> |   Stage: Stage-2                                   |
> |     Dependency Collection                          |
> |                                                    |
> |   Stage: Stage-0                                   |
> |     Move Operator                                  |
> |       files:                                       |
> |           hdfs directory: true                     |
> |           source: 
> file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d/.hive-staging_hive_2024-11-09_16-27-23_402_2565121622084463581-8/-ext-10000
>  |
> |           destination: 
> file:/efs/tmp/hive/_resultscache_/results-c7e3efee-ce91-41f5-9ce3-f95ec4d23f66/0de7c837-2868-4441-90ac-40c35a2f1d8d
>  |
> |                                                    |
> +----------------------------------------------------+
> {code}
> as in compile-time the IOWD query was considered cacheable, the 
> FileSinkOperator ended up writing to the query result cache, then the 
> MoveTask moved to the final path, which is also in the query cache, instead 
> of the expected folder '/tmp' in this example
> so not only does the output hit the wrong location, but all the IOWD data 
> ends up placed and moved on the query result cache's filesystem (which is 
> revealed as a serious performance regression on cloud storage when hitting 
> some throughput limits).
> whatever the solution is, it needs to bypass the query result cache for IOWD, 
> so the the original behavior is restored and files go to the right place 
> {code}
> |   Stage: Stage-0                                   |
> |     Move Operator                                  |
> |       files:                                       |
> |           hdfs directory: true                     |
> |           source: 
> s3a://somebucket/user/hive/.hive-staging_hive_2024-11-09_16-28-27_281_5911898841946485672-9/-ext-10000
>  |
> |           destination:  s3a://somebucket/tmp|
> |                                                    |
> +----------------------------------------------------+
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to