This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0ba5a7125808ede5d5300bb6c5d7f9f5bea8d73f
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Jul 26 22:58:47 2024 -0700

    [tools] Update plan-graph.py
    
    This patch update plan-graph.py as follows:
    - Display more information in runtime filter nodes.
    - Parse both "Filter routing table" and "Final filter table" section.
      The latter wins over the former if both exist.
    - Parse cardinality estimate from query plan.
    - Handle #Inst column in ExecSummary if exists.
    - Add option to group plan nodes belonging to the same fragment.
    - Add various args to display/hide certain information in graph.
    
    Change-Id: I22b0188bba3eef120c3e4b0f48408088123c4650
    Reviewed-on: http://gerrit.cloudera.org:8080/21619
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/diagnostics/experimental/plan-graph.py | 260 ++++++++++++++++++++++-------
 1 file changed, 196 insertions(+), 64 deletions(-)

diff --git a/bin/diagnostics/experimental/plan-graph.py 
b/bin/diagnostics/experimental/plan-graph.py
index 03e06d663..74b6047f2 100755
--- a/bin/diagnostics/experimental/plan-graph.py
+++ b/bin/diagnostics/experimental/plan-graph.py
@@ -17,7 +17,6 @@
 # specific language governing permissions and limitations
 # under the License
 
-from __future__ import print_function
 import argparse
 import math
 import re
@@ -37,7 +36,7 @@ from collections import namedtuple
 # If GraphViz dot program is available, we can directly create the image of 
the graph by
 # piping the output of this script to dot, such as:
 #
-#   plan-graph.py --verbosity 2 query-profile.txt | dot -T jpg -o 
query-plan.jpg
+#   plan-graph.py --verbosity 2 query-profile.txt | dot -T svg -o 
query-plan.svg
 #
 
 
@@ -56,17 +55,20 @@ ExecSumm = namedtuple('ExecSumm',
       'est_rows', 'peak_mem', 'est_peak_mem', 'detail'])
 FilterTable = namedtuple('FilterTable',
     ['id', 'src', 'targets', 'target_types', 'is_part_filter', 'pending',
-      'first_arrived', 'completed', 'enabled', 'bloom_size', 'est_fpp'])
-
-RE_NODE = re.compile("[|\- ]*([F]?\d+:[A-Z \-]*[0-9A-Z\-]+)")
-RE_RF = re.compile("[|\- ]*runtime filters: (.*)")
-RE_PREDICATES = re.compile("[|\- ]*(?:hash )?predicates: (.*)")
-RE_GROUPBY = re.compile("[|\- ]*group by: (.*)")
-RE_BRACE = re.compile("\[([^\]]*)\]")
-RE_RF_WAIT_MS = re.compile("RUNTIME_FILTER_WAIT_TIME_MS=(\d*),")
-RE_RF_MAX_SIZE = re.compile("RUNTIME_FILTER_MAX_SIZE=(\d*),")
-RE_RF_PLAN_SIZE = re.compile("(\d+\.\d\d [GMK]?B) \((\d+\.\d\d [GMK]?B)\)")
-
+      'first_arrived', 'completed', 'enabled', 'bloom_size', 'est_fpp', 
'min_val',
+      'max_val', 'in_list'])
+
+RE_NODE = re.compile(r"[|\- ]*([F]?\d+:[A-Z \-]*[0-9A-Z\-]+)")
+RE_RF = re.compile(r"[|\- ]*runtime filters: (.*)")
+RE_RF_TBL_HEADER = re.compile(r" ID  Src. Node .*")
+RE_PREDICATES = re.compile(r"[|\- ]*(?:hash )?predicates: (.*)")
+RE_GROUPBY = re.compile(r"[|\- ]*group by: (.*)")
+RE_BRACE = re.compile(r"\[([^\]]*)\]")
+RE_RF_WAIT_MS = re.compile(r"RUNTIME_FILTER_WAIT_TIME_MS=(\d*),")
+RE_RF_MAX_SIZE = re.compile(r"RUNTIME_FILTER_MAX_SIZE=(\d*),")
+RE_RF_PLAN_SIZE = re.compile(r"(\d+\.\d\d [GMK]?B) \((\d+\.\d\d [GMK]?B)\)")
+
+# Each list entry specify task to do until a section delimiter line is found.
 SECTIONS = [
     ParseTask(Task.DISCARD,
       "    Query Options (set by configuration):"),
@@ -83,17 +85,20 @@ SECTIONS = [
     ParseTask(Task.PARSE_EXEC_SUMM,
       "    Errors:"),
     ParseTask(Task.DISCARD,
-      "    Final filter table:"),
+      "    Filter routing table:"),
     ParseTask(Task.PARSE_FILTER_TABLE,
       "    Per Node Peak Memory Usage:")
   ]
 
 # Color definitions.
 CL_E_RF = 'blue'
+CL_E_BUILD = 'brown'
 CL_E_SCAN_TO_RF = 'orange'
 CL_V_BORDER = 'black'
 CL_V_DEF = 'lightgrey'
-CL_V_JOIN_UNION = 'yellow'
+CL_V_JOIN = 'yellow'
+CL_V_UNION = 'yellow'
+CL_V_AGG = 'yellow'
 CL_V_RF_OK = 'cyan'
 CL_V_RF_LATE = 'cyan3'
 CL_V_RF_UNFIT = 'cyan3'
@@ -101,24 +106,25 @@ CL_V_RF_UNFIT = 'cyan3'
 CL_V_SHADE = [CL_V_DEF, CL_V_DEF, '#f26161', '#f03939', '#ea0909']
 
 # Time parsing.
-RE_H = re.compile('([0-9\.]+)h')
-RE_M = re.compile('([0-9\.]+)m')
-RE_S = re.compile('([0-9\.]+)s')
-RE_MS = re.compile('([0-9\.]+)ms')
-RE_US = re.compile('([0-9\.]+)us')
-RE_NS = re.compile('([0-9\.]+)ns')
+RE_H = re.compile(r'([0-9\.]+)h')
+RE_M = re.compile(r'([0-9\.]+)m')
+RE_S = re.compile(r'([0-9\.]+)s')
+RE_MS = re.compile(r'([0-9\.]+)ms')
+RE_US = re.compile(r'([0-9\.]+)us')
+RE_NS = re.compile(r'([0-9\.]+)ns')
 MULTIPLIER = [10**-3, 1, 10**3, 10**6, 60 * 10**6, 60 * 60 * 10**6]
 RE_TIME = [RE_NS, RE_US, RE_MS, RE_S, RE_M, RE_H]
 
 # Other const.
 DEBUG = False
+FILTER_COL_COUNT = 14
 MAX_ATTRIB_TOK = 5
 MAX_ATTRIB_TOK_LEN = 40
 
 
 def debug(s):
   if DEBUG:
-    print(s)
+    sys.stderr.write(s + "\n")
 
 
 def str_to_us(line):
@@ -136,7 +142,8 @@ def str_to_us(line):
 
 
 def str_to_byte(line):
-  """Parse byte string into integer."""
+  """Parse byte string into integer.
+  For example, string '2.13 GB' will be parsed to 2287070085."""
   parts = line.split(' ')
   byte = float(parts[0])
   unit = parts[1]
@@ -151,14 +158,38 @@ def str_to_byte(line):
   return math.trunc(byte)
 
 
+def str_to_unit(line):
+  """Parse unit string to unit. Ex: 1.1K to 1100."""
+  unit = line[-1]
+  if (unit.isnumeric()):
+    return math.trunc(float(line))
+
+  amount = float(line[:-1])
+  if unit == 'K':
+    amount *= 10**3
+  elif unit == 'M':
+    amount *= 10**6
+  elif unit == 'B':
+    amount *= 10**9
+  return math.trunc(amount)
+
+
 class DotParser:
   """Class that parses Impala plain text query profile into GraphViz DOT 
format."""
 
-  def __init__(self, verbosity=0):
+  def __init__(self, output=sys.stdout, verbosity=0, show_estimate=False, 
no_time=False,
+      no_color=False, no_rf=False, cluster_fragment=False,
+      under_estimate=-1):
     # flags
+    self.out = output
     self.verbosity = verbosity
+    self.show_estimate = show_estimate
+    self.no_time = no_time
+    self.no_color = no_color
+    self.under_estimate = under_estimate
     # plan graph
     self.edges = defaultdict(list)
+    self.build_edges = defaultdict(list)
     self.vertices = set()
     self.plan_stack = []
     self.plan_nodes = {}
@@ -170,17 +201,21 @@ class DotParser:
     self.rf_wait_time = 10000
     self.rf_max_size = 16 * 1024**2
     self.successor_scans = defaultdict(set)
+    self.no_rf = no_rf
     # execution summary
     self.exec_summ_ct = 0
+    self.exec_summ_has_inst = False
     self.exec_summ_map = {}
     # filter tables
-    self.filter_table_ct = 0
+    self.filter_table_ct = -1
     self.filter_table_range = []
     self.filter_table_map = {}
     # predicates
     self.predicates = {}
     # group by
     self.groupby = {}
+    # fragment grouping
+    self.cluster_fragment = cluster_fragment
     # graphviz dot
     self.node_fillcolor = {}
     self.node_param = {}
@@ -265,7 +300,7 @@ class DotParser:
     self.vertices.add(dest)
 
   def parse_node(self, line, match):
-    """Parse a line that represents query plan node."""
+    """Parse a line that represent query plan node."""
     i = self.get_depth(line)
     tok = match.split(':')
     node = NodePlan(i, tok[0], tok[1])
@@ -276,6 +311,8 @@ class DotParser:
       # this is not a root node
       par = self.plan_stack[-2]
       self.add_edge(par.id, child.id)
+      if 'JOIN' in par.name and 'BUILD' not in par.name and child.depth > 
par.depth:
+        self.build_edges[par.id].append(child.id)
       if 'SCAN' in child.name:
         for n in self.plan_stack:
           self.successor_scans[n.id].add(child.id)
@@ -331,14 +368,25 @@ class DotParser:
     """Merge dictionary to a string of DOT vertex/edge attribute."""
     if not d:
       return ''
-    attrib_str = ', '.join(['{}="{}"'.format(k, v) for k, v in 
sorted(d.items())])
+    # Remove color attributes if no_color is True.
+    if self.no_color:
+      del d['fillcolor']
+      del d['color']
+    attrib_str = ', '.join([('{}={}' if v.startswith('<<') else '{}="{}"')
+      .format(k, v) for k, v in sorted(d.items())])
     return ' [' + attrib_str + ']'
 
+  def draw_vertex(self, name, attrib, comment):
+    self.out.write('{}{};\n'.format(name, self.dict_to_dot_attrib(attrib)))
+    if (comment):
+      self.out.write(comment + '\n')
+
   def draw_vertices(self, node_alias):
     """Draw the DOT vertices."""
     for v_id in sorted(self.vertices):
       dot_name = node_alias[v_id]
       dot_label = ''
+      comment = ''
       attrib = {}
       attrib['color'] = CL_V_BORDER
       attrib['fillcolor'] = CL_V_DEF
@@ -347,9 +395,9 @@ class DotParser:
         # this is a runtime filter.
         dot_label += self.runtime_filters[v_id]
         attrib['fillcolor'] = CL_V_RF_OK
-        ft = self.filter_table_map[v_id]
 
-        if ft and self.verbosity > 0:
+        if self.verbosity > 0 and v_id in self.filter_table_map:
+          ft = self.filter_table_map[v_id]
           # mark late RF
           if 'REMOTE' in ft.target_types:
             arrival_time = str_to_us(ft.completed) / 10.0**3
@@ -360,7 +408,7 @@ class DotParser:
             attrib['fillcolor'] = CL_V_RF_UNFIT
 
           for dest in self.rf_col_dest[v_id]:
-            dot_label += '\\n{} \=\> {}'.format(self.rf_col_source[v_id], dest)
+            dot_label += '\\n{} => {}'.format(self.rf_col_source[v_id], dest)
 
           if self.verbosity > 1:
             dot_label += '\\n' + (
@@ -370,21 +418,30 @@ class DotParser:
             dot_label += ', part' if ft.is_part_filter == 'true' else ''
             dot_label += '\\n' + ft.bloom_size
 
+          if (ft.first_arrived != 'N/A'):
+            comment = "// {} {} {} {}".format(
+                ft.id, ft.src, ft.first_arrived, ft.completed)
+
       elif v_id in self.exec_summ_map:
         # This is either fragment or plan node.
         node = self.plan_nodes[v_id]
         es = self.exec_summ_map[v_id]
         if es:
-          dot_label += es.id + ":" + es.name
+          dot_label += '{}:{} ({})'.format(es.id, es.name, es.num_inst)
         else:
           dot_label += node.id + ":" + node.name
 
         if v_id in self.node_fillcolor:
           attrib['fillcolor'] = self.node_fillcolor[v_id]
 
-        if (attrib['fillcolor'] == CL_V_DEF
-              and ('JOIN' in node.name or 'UNION' in node.name)):
-          attrib['fillcolor'] = CL_V_JOIN_UNION
+        if (attrib['fillcolor'] == CL_V_DEF):
+          if ('JOIN' in node.name):
+            attrib['fillcolor'] = CL_V_JOIN
+          elif ('UNION' in node.name):
+            attrib['fillcolor'] = CL_V_UNION
+          elif ('AGGREGATE' in node.name and v_id in self.node_param
+              and 'FINAL' in self.node_param[v_id]):
+            attrib['fillcolor'] = CL_V_AGG
 
         if (self.verbosity > 1 or (self.verbosity > 0 and 'SCAN' in 
node.name)):
           if v_id in self.node_param:
@@ -403,14 +460,29 @@ class DotParser:
             dot_label += '\\ngroupby(' + group + ')'
 
         if es:
-          dot_label += '\\n' + es.avg_time
+          if not self.no_time:
+            dot_label += '\\n' + es.avg_time
+            if self.verbosity > 1:
+              dot_label += ', ' + es.max_time
+
           if es.num_rows != "-":
-            attrib['xlabel'] = es.num_rows
-          if self.verbosity > 1:
-            dot_label += ', ' + es.num_inst + " inst"
+            edge_label = es.num_rows
+            if self.show_estimate:
+              edge_label += ' ({})'.format(es.est_rows)
+              row_actual = str_to_unit(es.num_rows)
+              row_est = str_to_unit(es.est_rows)
+              if (self.under_estimate > 0):
+                threshold = self.under_estimate * row_est
+                if (row_actual > threshold):
+                  edge_label = "<<font 
color='red'>{}</font>>".format(edge_label)
+            attrib['xlabel'] = edge_label
 
       attrib['label'] = dot_label
-      print('{}{};'.format(dot_name, self.dict_to_dot_attrib(attrib)))
+      self.draw_vertex(dot_name, attrib, comment)
+
+  def draw_edge(self, ori, dest, attrib):
+    self.out.write('{} -> {}{};\n'.format(
+      ori, dest, self.dict_to_dot_attrib(attrib)))
 
   def draw_edges(self, node_alias):
     """Draw the DOT edges."""
@@ -419,12 +491,15 @@ class DotParser:
         attrib = {}
         ori = node_alias[k]
         dest = node_alias[d]
-        if k.startswith('RF') or d.startswith('RF'):
+        if (k.startswith('RF') or d.startswith('RF')):
           attrib['color'] = CL_E_RF
         else:
           # Other edges that does not touch RF node should be drawn backward.
           attrib['dir'] = 'back'
-        print('{} -> {}{};'.format(ori, dest, self.dict_to_dot_attrib(attrib)))
+          if ((k in self.build_edges) and (d in self.build_edges[k])):
+            # Highlight join build edge with different color.
+            attrib['color'] = CL_E_BUILD
+        self.draw_edge(ori, dest, attrib)
 
   def draw_dependency_edges(self, node_alias):
     """Draw dependency edges from scanner vertices to runtime filter vertices.
@@ -445,7 +520,23 @@ class DotParser:
           attrib['color'] = CL_E_SCAN_TO_RF
         else:
           attrib['style'] = 'invis'
-        print('{} -> {}{};'.format(ori, dest, self.dict_to_dot_attrib(attrib)))
+        self.draw_edge(ori, dest, attrib)
+
+  def draw_fragment_cluster(self):
+    for f_id in sorted(self.vertices):
+      if not f_id.startswith('F'):
+        continue
+      cluster = [f_id]
+      par = [f_id]
+      while par:
+        childs = []
+        for p in par:
+          for c in self.edges[p]:
+            if not (c.startswith('F') or c.startswith('R')):
+              childs.append(c)
+              cluster.append('N' + c)
+        par = childs
+      self.out.write('subgraph cluster_{} {{ {} }}\n'.format(f_id, '; 
'.join(cluster)))
 
   def draw(self):
     """Draw the DOT format."""
@@ -457,16 +548,18 @@ class DotParser:
         alias = 'N' + v_id if v_id.isnumeric() else v_id
         node_alias[v_id] = alias
 
-    print('digraph G {')
-    print('node [style=filled];')
-    print('rankdir = TB;')
+    self.out.write('digraph G {\n')
+    self.out.write('node [style=filled];\n')
+    self.out.write('rankdir = TB;\n\n')
     self.draw_vertices(node_alias)
-    print('\n')
+    self.out.write('\n')
     self.draw_edges(node_alias)
     if self.verbosity > 1:
-      print('\n')
+      self.out.write('\n\n')
       self.draw_dependency_edges(node_alias)
-    print('}')
+    if self.cluster_fragment:
+      self.draw_fragment_cluster()
+    self.out.write('}\n')
 
   def parse_options(self, line):
     """Parse query option section.
@@ -491,7 +584,7 @@ class DotParser:
     if m:
       self.parse_node(line, m.group(1))
       return
-    if self.verbosity > 0:
+    if self.verbosity > 0 and not self.no_rf:
       m = RE_RF.match(line)
       if m:
         self.parse_rf(m.group(1))
@@ -513,6 +606,9 @@ class DotParser:
     This section begins with 'ExecSummary:' line in query profile."""
     self.exec_summ_ct += 1
     if self.exec_summ_ct <= 2:
+      # IMPALA-4618 separate column "#Hosts" and "#Inst".
+      # Verify if we have column "#Inst" or not.
+      self.exec_summ_has_inst |= (line.find("#Inst") >= 0)
       return
 
     parts = list(
@@ -520,13 +616,16 @@ class DotParser:
           filter(None,
             line.strip().replace("|", " ").replace("--", "  ").split("  "))))
 
+    if not self.exec_summ_has_inst:
+      parts.insert(1, parts[1])
     if len(parts) == 7:
+      # some operator might not have "#Rows" and "Est. #Rows" columns.
       parts.insert(5, "-")
       parts.insert(6, "-")
-    assert len(parts) >= 9
     if len(parts) == 9:
+      # some operator might not have "Detail".
       parts.append("-")
-    assert len(parts) == 10
+    assert len(parts) == 10, parts
 
     # split id and name from parts[0].
     tok = parts[0].split(':')
@@ -541,20 +640,29 @@ class DotParser:
   def parse_filter_table(self, line):
     """Parse the filter table section.
     This section begins with 'Final filter table:' line in query profile."""
-    self.filter_table_ct += 1
-    if self.filter_table_ct == 1:
+    if RE_RF_TBL_HEADER.match(line):
+      self.filter_table_ct = 0
+
+    if self.filter_table_ct >= 0:
+      self.filter_table_ct += 1
+
+    if self.filter_table_ct < 0 or self.filter_table_ct == 2:
+      return
+    elif self.filter_table_ct == 1:
       # Tokenize the table header to get the column boundaries
       headers = list(
           map(lambda x: x.strip(),
             filter(None, line.strip().split("  "))))
+      debug(headers)
       begin = 0
       for head in headers:
         end = line.find(head) + len(head)
         self.filter_table_range.append((begin, end))
         begin = end
-      assert len(self.filter_table_range) >= 11
-
-    if self.filter_table_ct <= 2:
+      for i in range(0, FILTER_COL_COUNT - len(headers)):
+        self.filter_table_range.append((end, end))
+      assert len(self.filter_table_range) >= FILTER_COL_COUNT, 
self.filter_table_range
+      debug(self.filter_table_range)
       return
 
     parts = []
@@ -566,10 +674,15 @@ class DotParser:
         # test to parse the first column into int. If fails, it means we 
arrived at the
         # end of final filter table.
         if not tok.isnumeric():
+          self.filter_table_ct = -1
           return
-      parts.append(line[begin:end].strip())
+      if begin == end:
+        # This is a missing column. Append nothing to parts.
+        parts.append('')
+      else:
+        parts.append(line[begin:end].strip())
     parts[0] = 'RF{:0>3}'.format(parts[0])
-    ft = FilterTable._make(parts[0:11])
+    ft = FilterTable._make(parts[0:FILTER_COL_COUNT])
     debug(ft)
     self.filter_table_map[ft.id] = ft
 
@@ -596,9 +709,9 @@ class DotParser:
     for line in inf:
       if i > len(SECTIONS) - 1:
         break
-      if (line.rstrip().startswith(SECTIONS[i].delimiter) or
-            (not SECTIONS[i].delimiter and not line.rstrip())):
-        debug('Found delimiter ' + SECTIONS[i].delimiter + ' in ' + line)
+      if (line.rstrip().startswith(SECTIONS[i].delimiter)
+          or (not SECTIONS[i].delimiter and not line.rstrip())):
+        debug('Found delimiter {} in {}'.format(SECTIONS[i].delimiter, line))
         i += 1
         continue
       if SECTIONS[i].task == Task.DISCARD:
@@ -617,16 +730,35 @@ class DotParser:
 
 def main():
   parser = argparse.ArgumentParser(
-      description="This script reads Impala plain text query profile and 
outputs the \
+      description="This script read Impala plain text query profile and output 
the \
           query plan in GraphViz DOT format.")
   parser.add_argument("infile", nargs="?", type=argparse.FileType('r'),
       default=sys.stdin, help="Impala query profile in plain text format. 
stdin will \
           be read if argument is not supplied.")
-  parser.add_argument("--verbosity", type=int, default=2, choices=range(0, 4),
+  parser.add_argument('-o', '--output', action='store',
+      type=argparse.FileType('w'), dest='output', default=sys.stdout,
+      help="Directs the output to a name of your choice")
+  parser.add_argument('-v', '--verbosity', type=int, default=2, 
choices=range(0, 4),
       help='Verbosity level of produced graph. Default to 2.')
+  parser.add_argument('-e', '--estimate', default=False, action='store_true',
+      help="Show estimated rows alongside the returned rows in edge label.")
+  parser.add_argument('--no-time', default=False, action='store_true',
+      help="Do not show timing information.")
+  parser.add_argument('--no-color', default=False, action='store_true',
+      help="Remove custom coloring.")
+  parser.add_argument('--no-rf', default=False, action='store_true',
+      help="Remove runtime filter vertices.")
+  parser.add_argument('--cluster-fragment', default=False, action='store_true',
+      help="Group vertices belonging to the same fragment into same cluster.")
+  parser.add_argument('--under-estimate', default=50, type=int,
+      help="The ratio between actual vs estimated rows returned to highlight 
as "
+      "underestimation. Default to 50, which will highlight query operator 
that "
+      "returns more than 50 times of rows than the estimate. "
+      "Require --estimate=True.")
   args = parser.parse_args()
 
-  dp = DotParser(args.verbosity)
+  dp = DotParser(args.output, args.verbosity, args.estimate, args.no_time,
+      args.no_color, args.no_rf, args.cluster_fragment, args.under_estimate)
   dp.parse(args.infile)
 
 

Reply via email to