[ 
https://issues.apache.org/jira/browse/IMPALA-14843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18080183#comment-18080183
 ] 

ASF subversion and git services commented on IMPALA-14843:
----------------------------------------------------------

Commit 9582b8bff2a013309507905e21a1020a72bacee4 in impala's branch 
refs/heads/master from stiga-huang
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=9582b8bff ]

IMPALA-14843: Show cancelled nodes in ExecSummary

In backend execution, a plan node instance could be closed before it
produces all its output rows, e.g., when a UnionNode reaches its output
limit, the children operands are all closed regardless what their states
(prepared, opened, executing, etc.) are. In ExecSummary shown in the
query profile or WebUI, such nodes usually have 0 as the output
cardinality without any indication, which is confusing. This patch adds
a "CANCELLED" marker in the Detail column to represent such nodes, i.e.,
nodes that haven't reached the "Last Batch Returned" state.

Implementation:
 - Adds a last_batch_returned_ flag in ExecNode to indicate whether the
   "Last Batch Returned" event has been added for this node.
 - For ExecNodes under a SubplanNode, use the flag of the top-level
   SubplanNode since nodes in Subplan are reset multiple times.
 - Extends ExecSummaryDataPB and TExecStats to add a last_batch_returned
   field to reflect such case. Note that ExecSummaryDataPB is used in
   executors reporting execution status to the coordinator. TExecStats
   is used in coordinator to track execution stats for plan nodes.
 - Executor mirrors the flag to ExecSummaryDataPB for each node when
   generating status report for coordinator. To lookup the ExecNode
   instance based on the plan node id, a map is added in
   FragmentInstanceState when the ExecNode tree is built.
 - Coordinator appends the CANCELLED marker when returning TExecSummary
   if the query finishes execution. This avoids showing the CANCELLED
   for inflight queries.

Example:
For query
 "with l as
    (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
  select STRAIGHT_JOIN count(*)
  from (select * from tpch.lineitem a LIMIT 1) a
  join (select * from l LIMIT 125000) b
  on a.l_orderkey = -b.l_orderkey"

the ExecSummary is
Operator                 #Hosts  #Inst   Avg Time   Max Time    #Rows  Est. 
#Rows   Peak Mem  Est. Peak Mem  Detail
---------------------------------------------------------------------------------------------------------------------------------------
F01:ROOT                      1      1    8.647us    8.647us                    
           0              0
05:AGGREGATE                  1      1    0.000ns    0.000ns        1           
1   24.00 KB       16.00 KB  FINALIZE
04:HASH JOIN                  1      1  251.664us  251.664us        0           
1    9.06 MB        4.75 MB  INNER JOIN, BROADCAST
|--08:EXCHANGE                1      1  427.557us  427.557us  125.00K     
125.00K  232.00 KB      337.52 KB  UNPARTITIONED
|  F05:EXCHANGE SENDER        1      1    3.007ms    3.007ms                    
    54.05 KB       48.00 KB
|  07:EXCHANGE                1      1  499.833us  499.833us  125.00K     
125.00K    5.12 MB      361.52 KB  UNPARTITIONED
|  F04:EXCHANGE SENDER        3      3    2.681ms    2.833ms                    
    54.05 KB       48.00 KB
|  01:UNION                   3      3   43.817us   48.670us  375.00K     
125.00K          0              0
|  |--03:SCAN HDFS            3      3   40.710us   49.726us        0       
6.00M          0      264.00 MB  tpch.lineitem, CANCELLED
|  02:SCAN HDFS               3      3    5.813ms    7.886ms  377.86K       
6.00M   48.23 MB      264.00 MB  tpch.lineitem, CANCELLED
06:EXCHANGE                   1      1   10.288us   10.288us        1           
1   16.00 KB       16.00 KB  UNPARTITIONED
F00:EXCHANGE SENDER           3      3   32.298us   35.572us                    
     31.00 B       48.00 KB
00:SCAN HDFS                  3      3   76.692ms   85.439ms        3           
1   48.07 MB      264.00 MB  tpch.lineitem a

Note that "02:SCAN HDFS" and "03:SCAN HDFS" are cancelled due to
"01:UNION" reaches its limit.

Test:
 - Added e2e tests in query_test/test_observability.py

Assisted-by: Claude Sonnet 4.5
Change-Id: Ibd1bf51112e7c8c5609bb9e9525d960efc524215
Reviewed-on: http://gerrit.cloudera.org:8080/24118
Reviewed-by: Michael Smith <[email protected]>
Reviewed-by: Jason Fehr <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


> Show cancelled nodes in ExecSummary
> -----------------------------------
>
>                 Key: IMPALA-14843
>                 URL: https://issues.apache.org/jira/browse/IMPALA-14843
>             Project: IMPALA
>          Issue Type: New Feature
>          Components: Backend
>            Reporter: Quanlong Huang
>            Assignee: Quanlong Huang
>            Priority: Major
>
> PlanNode execution could be cancelled when its parent don't need more rows, 
> e.g. parent node reaches its limit. Currently, users have to distinguish this 
> by checking the "Node Lifecycle Event Timeline" in the profile, to see if the 
> node is opened and closed as expected. Take the following query as an example:
> {code:sql}
> with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
> select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
>     join (select * from l LIMIT 125000) b on a.l_orderkey = 
> -b.l_orderkey{code}
> The UNION ALL operation has a limit of 125000 which is less than the number 
> of rows in tpch.lineitem (6M). So the second union operand won't be executed. 
> In the ExecSummary, we just show its output rows is 0 (see the line of 
> "03:SCAN HDFS"):
> {noformat}
> Operator                 #Hosts  #Inst   Avg Time   Max Time    #Rows  Est. 
> #Rows   Peak Mem  Est. Peak Mem  Detail
> -----------------------------------------------------------------------------------------------------------------------------------
> F01:ROOT                      1      1   16.379us   16.379us                  
>              0              0
> 05:AGGREGATE                  1      1    0.000ns    0.000ns        1         
>   1   24.00 KB       16.00 KB  FINALIZE
> 04:HASH JOIN                  1      1  348.292us  348.292us        0         
>   1    9.06 MB        4.75 MB  INNER JOIN, BROADCAST
> |--08:EXCHANGE                1      1  407.110us  407.110us  125.00K     
> 125.00K  288.00 KB      337.52 KB  UNPARTITIONED
> |  F05:EXCHANGE SENDER        1      1    1.834ms    1.834ms                  
>       54.05 KB       48.00 KB
> |  07:EXCHANGE                1      1  221.991us  221.991us  125.00K     
> 125.00K  184.00 KB      361.52 KB  UNPARTITIONED
> |  F04:EXCHANGE SENDER        3      3    3.476ms    5.611ms                  
>       54.05 KB       48.00 KB
> |  01:UNION                   3      3   76.304us  121.649us  375.00K     
> 125.00K          0              0
> |  |--03:SCAN HDFS            3      3   33.598us   43.943us        0       
> 6.00M          0      264.00 MB  tpch.lineitem
> |  02:SCAN HDFS               3      3   83.305ms   98.254ms  377.86K       
> 6.00M   48.17 MB      264.00 MB  tpch.lineitem
> 06:EXCHANGE                   1      1    8.285us    8.285us        1         
>   1   16.00 KB       16.00 KB  UNPARTITIONED
> F00:EXCHANGE SENDER           3      3   32.625us   40.147us                  
>        31.00 B       48.00 KB
> 00:SCAN HDFS                  3      3  169.596ms  174.375ms        3         
>   1   48.08 MB      264.00 MB  tpch.lineitem a{noformat}
> If users just check the query plan, they would be confused since both 
> ScanNodes don't have any predicates.
> {noformat}
> |  01:UNION
> |  |  pass-through-operands: all
> |  |  limit: 125000
> |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
> |  |  tuple-ids=4 row-size=8B cardinality=125.00K
> |  |  in pipelines: 02(GETNEXT), 03(GETNEXT)
> |  |
> |  |--03:SCAN HDFS [tpch.lineitem, RANDOM]
> |  |     HDFS partitions=1/1 files=1 size=718.94MB
> |  |     stored statistics:
> |  |       table: rows=6.00M size=718.94MB
> |  |       columns: all
> |  |     extrapolated-rows=disabled max-scan-range-rows=1.07M
> |  |     mem-estimate=264.00MB mem-reservation=8.00MB thread-reservation=1
> |  |     tuple-ids=3 row-size=8B cardinality=6.00M
> |  |     in pipelines: 03(GETNEXT)
> |  |
> |  02:SCAN HDFS [tpch.lineitem, RANDOM]
> |     HDFS partitions=1/1 files=1 size=718.94MB
> |     stored statistics:
> |       table: rows=6.00M size=718.94MB
> |       columns: all
> |     extrapolated-rows=disabled max-scan-range-rows=1.07M
> |     mem-estimate=264.00MB mem-reservation=8.00MB thread-reservation=1
> |     tuple-ids=2 row-size=8B cardinality=6.00M
> |     in pipelines: 02(GETNEXT){noformat}
> The only indication is in the life cycle of ScanNode(id=3):
> {noformat}
>           HDFS_SCAN_NODE (id=3):
>             Table Name: tpch.lineitem
>             Hdfs split stats (<volume id>:<# splits>/<split lengths>): 
> 0:2/256.00 MB
>             ExecOption: TEXT Codegen Enabled
>             Node Lifecycle Event Timeline: 139.781ms
>                - Closed: 139.781ms (139.781ms){noformat}
> It's not even opened and finally closed directly. A normal timeline looks 
> like this:
> {noformat}
>           Node Lifecycle Event Timeline: 196.153ms
>              - Open Started: 21.666ms (21.666ms)
>              - Open Finished: 21.697ms (31.313us)
>              - First Batch Requested: 21.702ms (5.330us)
>              - First Batch Returned: 195.956ms (174.254ms)
>              - Last Batch Returned: 195.957ms (187.000ns)
>              - Closed: 196.153ms (196.518us){noformat}
> It'd be helpful to add a "cancelled" marker in the ExecSummary to indicate a 
> node is not fully executed, i.e. don't have "Last Batch Returned" event.
> This is also helpful for HBO when tracking output cardinality of the 
> ScanNodes. Such ScanNodes have incomplete ouput so should be skipped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to