This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0ba5a7125808ede5d5300bb6c5d7f9f5bea8d73f Author: Riza Suminto <[email protected]> AuthorDate: Fri Jul 26 22:58:47 2024 -0700 [tools] Update plan-graph.py This patch update plan-graph.py as follows: - Display more information in runtime filter nodes. - Parse both "Filter routing table" and "Final filter table" section. The latter wins over the former if both exist. - Parse cardinality estimate from query plan. - Handle #Inst column in ExecSummary if exists. - Add option to group plan nodes belonging to the same fragment. - Add various args to display/hide certain information in graph. Change-Id: I22b0188bba3eef120c3e4b0f48408088123c4650 Reviewed-on: http://gerrit.cloudera.org:8080/21619 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/diagnostics/experimental/plan-graph.py | 260 ++++++++++++++++++++++------- 1 file changed, 196 insertions(+), 64 deletions(-) diff --git a/bin/diagnostics/experimental/plan-graph.py b/bin/diagnostics/experimental/plan-graph.py index 03e06d663..74b6047f2 100755 --- a/bin/diagnostics/experimental/plan-graph.py +++ b/bin/diagnostics/experimental/plan-graph.py @@ -17,7 +17,6 @@ # specific language governing permissions and limitations # under the License -from __future__ import print_function import argparse import math import re @@ -37,7 +36,7 @@ from collections import namedtuple # If GraphViz dot program is available, we can directly create the image of the graph by # piping the output of this script to dot, such as: # -# plan-graph.py --verbosity 2 query-profile.txt | dot -T jpg -o query-plan.jpg +# plan-graph.py --verbosity 2 query-profile.txt | dot -T svg -o query-plan.svg # @@ -56,17 +55,20 @@ ExecSumm = namedtuple('ExecSumm', 'est_rows', 'peak_mem', 'est_peak_mem', 'detail']) FilterTable = namedtuple('FilterTable', ['id', 'src', 'targets', 'target_types', 'is_part_filter', 'pending', - 'first_arrived', 'completed', 'enabled', 'bloom_size', 'est_fpp']) - -RE_NODE = re.compile("[|\- ]*([F]?\d+:[A-Z \-]*[0-9A-Z\-]+)") -RE_RF = re.compile("[|\- ]*runtime filters: (.*)") -RE_PREDICATES = re.compile("[|\- ]*(?:hash )?predicates: (.*)") -RE_GROUPBY = re.compile("[|\- ]*group by: (.*)") -RE_BRACE = re.compile("\[([^\]]*)\]") -RE_RF_WAIT_MS = re.compile("RUNTIME_FILTER_WAIT_TIME_MS=(\d*),") -RE_RF_MAX_SIZE = re.compile("RUNTIME_FILTER_MAX_SIZE=(\d*),") -RE_RF_PLAN_SIZE = re.compile("(\d+\.\d\d [GMK]?B) \((\d+\.\d\d [GMK]?B)\)") - + 'first_arrived', 'completed', 'enabled', 'bloom_size', 'est_fpp', 'min_val', + 'max_val', 'in_list']) + +RE_NODE = re.compile(r"[|\- ]*([F]?\d+:[A-Z \-]*[0-9A-Z\-]+)") +RE_RF = re.compile(r"[|\- ]*runtime filters: (.*)") +RE_RF_TBL_HEADER = re.compile(r" ID Src. Node .*") +RE_PREDICATES = re.compile(r"[|\- ]*(?:hash )?predicates: (.*)") +RE_GROUPBY = re.compile(r"[|\- ]*group by: (.*)") +RE_BRACE = re.compile(r"\[([^\]]*)\]") +RE_RF_WAIT_MS = re.compile(r"RUNTIME_FILTER_WAIT_TIME_MS=(\d*),") +RE_RF_MAX_SIZE = re.compile(r"RUNTIME_FILTER_MAX_SIZE=(\d*),") +RE_RF_PLAN_SIZE = re.compile(r"(\d+\.\d\d [GMK]?B) \((\d+\.\d\d [GMK]?B)\)") + +# Each list entry specify task to do until a section delimiter line is found. SECTIONS = [ ParseTask(Task.DISCARD, " Query Options (set by configuration):"), @@ -83,17 +85,20 @@ SECTIONS = [ ParseTask(Task.PARSE_EXEC_SUMM, " Errors:"), ParseTask(Task.DISCARD, - " Final filter table:"), + " Filter routing table:"), ParseTask(Task.PARSE_FILTER_TABLE, " Per Node Peak Memory Usage:") ] # Color definitions. CL_E_RF = 'blue' +CL_E_BUILD = 'brown' CL_E_SCAN_TO_RF = 'orange' CL_V_BORDER = 'black' CL_V_DEF = 'lightgrey' -CL_V_JOIN_UNION = 'yellow' +CL_V_JOIN = 'yellow' +CL_V_UNION = 'yellow' +CL_V_AGG = 'yellow' CL_V_RF_OK = 'cyan' CL_V_RF_LATE = 'cyan3' CL_V_RF_UNFIT = 'cyan3' @@ -101,24 +106,25 @@ CL_V_RF_UNFIT = 'cyan3' CL_V_SHADE = [CL_V_DEF, CL_V_DEF, '#f26161', '#f03939', '#ea0909'] # Time parsing. -RE_H = re.compile('([0-9\.]+)h') -RE_M = re.compile('([0-9\.]+)m') -RE_S = re.compile('([0-9\.]+)s') -RE_MS = re.compile('([0-9\.]+)ms') -RE_US = re.compile('([0-9\.]+)us') -RE_NS = re.compile('([0-9\.]+)ns') +RE_H = re.compile(r'([0-9\.]+)h') +RE_M = re.compile(r'([0-9\.]+)m') +RE_S = re.compile(r'([0-9\.]+)s') +RE_MS = re.compile(r'([0-9\.]+)ms') +RE_US = re.compile(r'([0-9\.]+)us') +RE_NS = re.compile(r'([0-9\.]+)ns') MULTIPLIER = [10**-3, 1, 10**3, 10**6, 60 * 10**6, 60 * 60 * 10**6] RE_TIME = [RE_NS, RE_US, RE_MS, RE_S, RE_M, RE_H] # Other const. DEBUG = False +FILTER_COL_COUNT = 14 MAX_ATTRIB_TOK = 5 MAX_ATTRIB_TOK_LEN = 40 def debug(s): if DEBUG: - print(s) + sys.stderr.write(s + "\n") def str_to_us(line): @@ -136,7 +142,8 @@ def str_to_us(line): def str_to_byte(line): - """Parse byte string into integer.""" + """Parse byte string into integer. + For example, string '2.13 GB' will be parsed to 2287070085.""" parts = line.split(' ') byte = float(parts[0]) unit = parts[1] @@ -151,14 +158,38 @@ def str_to_byte(line): return math.trunc(byte) +def str_to_unit(line): + """Parse unit string to unit. Ex: 1.1K to 1100.""" + unit = line[-1] + if (unit.isnumeric()): + return math.trunc(float(line)) + + amount = float(line[:-1]) + if unit == 'K': + amount *= 10**3 + elif unit == 'M': + amount *= 10**6 + elif unit == 'B': + amount *= 10**9 + return math.trunc(amount) + + class DotParser: """Class that parses Impala plain text query profile into GraphViz DOT format.""" - def __init__(self, verbosity=0): + def __init__(self, output=sys.stdout, verbosity=0, show_estimate=False, no_time=False, + no_color=False, no_rf=False, cluster_fragment=False, + under_estimate=-1): # flags + self.out = output self.verbosity = verbosity + self.show_estimate = show_estimate + self.no_time = no_time + self.no_color = no_color + self.under_estimate = under_estimate # plan graph self.edges = defaultdict(list) + self.build_edges = defaultdict(list) self.vertices = set() self.plan_stack = [] self.plan_nodes = {} @@ -170,17 +201,21 @@ class DotParser: self.rf_wait_time = 10000 self.rf_max_size = 16 * 1024**2 self.successor_scans = defaultdict(set) + self.no_rf = no_rf # execution summary self.exec_summ_ct = 0 + self.exec_summ_has_inst = False self.exec_summ_map = {} # filter tables - self.filter_table_ct = 0 + self.filter_table_ct = -1 self.filter_table_range = [] self.filter_table_map = {} # predicates self.predicates = {} # group by self.groupby = {} + # fragment grouping + self.cluster_fragment = cluster_fragment # graphviz dot self.node_fillcolor = {} self.node_param = {} @@ -265,7 +300,7 @@ class DotParser: self.vertices.add(dest) def parse_node(self, line, match): - """Parse a line that represents query plan node.""" + """Parse a line that represent query plan node.""" i = self.get_depth(line) tok = match.split(':') node = NodePlan(i, tok[0], tok[1]) @@ -276,6 +311,8 @@ class DotParser: # this is not a root node par = self.plan_stack[-2] self.add_edge(par.id, child.id) + if 'JOIN' in par.name and 'BUILD' not in par.name and child.depth > par.depth: + self.build_edges[par.id].append(child.id) if 'SCAN' in child.name: for n in self.plan_stack: self.successor_scans[n.id].add(child.id) @@ -331,14 +368,25 @@ class DotParser: """Merge dictionary to a string of DOT vertex/edge attribute.""" if not d: return '' - attrib_str = ', '.join(['{}="{}"'.format(k, v) for k, v in sorted(d.items())]) + # Remove color attributes if no_color is True. + if self.no_color: + del d['fillcolor'] + del d['color'] + attrib_str = ', '.join([('{}={}' if v.startswith('<<') else '{}="{}"') + .format(k, v) for k, v in sorted(d.items())]) return ' [' + attrib_str + ']' + def draw_vertex(self, name, attrib, comment): + self.out.write('{}{};\n'.format(name, self.dict_to_dot_attrib(attrib))) + if (comment): + self.out.write(comment + '\n') + def draw_vertices(self, node_alias): """Draw the DOT vertices.""" for v_id in sorted(self.vertices): dot_name = node_alias[v_id] dot_label = '' + comment = '' attrib = {} attrib['color'] = CL_V_BORDER attrib['fillcolor'] = CL_V_DEF @@ -347,9 +395,9 @@ class DotParser: # this is a runtime filter. dot_label += self.runtime_filters[v_id] attrib['fillcolor'] = CL_V_RF_OK - ft = self.filter_table_map[v_id] - if ft and self.verbosity > 0: + if self.verbosity > 0 and v_id in self.filter_table_map: + ft = self.filter_table_map[v_id] # mark late RF if 'REMOTE' in ft.target_types: arrival_time = str_to_us(ft.completed) / 10.0**3 @@ -360,7 +408,7 @@ class DotParser: attrib['fillcolor'] = CL_V_RF_UNFIT for dest in self.rf_col_dest[v_id]: - dot_label += '\\n{} \=\> {}'.format(self.rf_col_source[v_id], dest) + dot_label += '\\n{} => {}'.format(self.rf_col_source[v_id], dest) if self.verbosity > 1: dot_label += '\\n' + ( @@ -370,21 +418,30 @@ class DotParser: dot_label += ', part' if ft.is_part_filter == 'true' else '' dot_label += '\\n' + ft.bloom_size + if (ft.first_arrived != 'N/A'): + comment = "// {} {} {} {}".format( + ft.id, ft.src, ft.first_arrived, ft.completed) + elif v_id in self.exec_summ_map: # This is either fragment or plan node. node = self.plan_nodes[v_id] es = self.exec_summ_map[v_id] if es: - dot_label += es.id + ":" + es.name + dot_label += '{}:{} ({})'.format(es.id, es.name, es.num_inst) else: dot_label += node.id + ":" + node.name if v_id in self.node_fillcolor: attrib['fillcolor'] = self.node_fillcolor[v_id] - if (attrib['fillcolor'] == CL_V_DEF - and ('JOIN' in node.name or 'UNION' in node.name)): - attrib['fillcolor'] = CL_V_JOIN_UNION + if (attrib['fillcolor'] == CL_V_DEF): + if ('JOIN' in node.name): + attrib['fillcolor'] = CL_V_JOIN + elif ('UNION' in node.name): + attrib['fillcolor'] = CL_V_UNION + elif ('AGGREGATE' in node.name and v_id in self.node_param + and 'FINAL' in self.node_param[v_id]): + attrib['fillcolor'] = CL_V_AGG if (self.verbosity > 1 or (self.verbosity > 0 and 'SCAN' in node.name)): if v_id in self.node_param: @@ -403,14 +460,29 @@ class DotParser: dot_label += '\\ngroupby(' + group + ')' if es: - dot_label += '\\n' + es.avg_time + if not self.no_time: + dot_label += '\\n' + es.avg_time + if self.verbosity > 1: + dot_label += ', ' + es.max_time + if es.num_rows != "-": - attrib['xlabel'] = es.num_rows - if self.verbosity > 1: - dot_label += ', ' + es.num_inst + " inst" + edge_label = es.num_rows + if self.show_estimate: + edge_label += ' ({})'.format(es.est_rows) + row_actual = str_to_unit(es.num_rows) + row_est = str_to_unit(es.est_rows) + if (self.under_estimate > 0): + threshold = self.under_estimate * row_est + if (row_actual > threshold): + edge_label = "<<font color='red'>{}</font>>".format(edge_label) + attrib['xlabel'] = edge_label attrib['label'] = dot_label - print('{}{};'.format(dot_name, self.dict_to_dot_attrib(attrib))) + self.draw_vertex(dot_name, attrib, comment) + + def draw_edge(self, ori, dest, attrib): + self.out.write('{} -> {}{};\n'.format( + ori, dest, self.dict_to_dot_attrib(attrib))) def draw_edges(self, node_alias): """Draw the DOT edges.""" @@ -419,12 +491,15 @@ class DotParser: attrib = {} ori = node_alias[k] dest = node_alias[d] - if k.startswith('RF') or d.startswith('RF'): + if (k.startswith('RF') or d.startswith('RF')): attrib['color'] = CL_E_RF else: # Other edges that does not touch RF node should be drawn backward. attrib['dir'] = 'back' - print('{} -> {}{};'.format(ori, dest, self.dict_to_dot_attrib(attrib))) + if ((k in self.build_edges) and (d in self.build_edges[k])): + # Highlight join build edge with different color. + attrib['color'] = CL_E_BUILD + self.draw_edge(ori, dest, attrib) def draw_dependency_edges(self, node_alias): """Draw dependency edges from scanner vertices to runtime filter vertices. @@ -445,7 +520,23 @@ class DotParser: attrib['color'] = CL_E_SCAN_TO_RF else: attrib['style'] = 'invis' - print('{} -> {}{};'.format(ori, dest, self.dict_to_dot_attrib(attrib))) + self.draw_edge(ori, dest, attrib) + + def draw_fragment_cluster(self): + for f_id in sorted(self.vertices): + if not f_id.startswith('F'): + continue + cluster = [f_id] + par = [f_id] + while par: + childs = [] + for p in par: + for c in self.edges[p]: + if not (c.startswith('F') or c.startswith('R')): + childs.append(c) + cluster.append('N' + c) + par = childs + self.out.write('subgraph cluster_{} {{ {} }}\n'.format(f_id, '; '.join(cluster))) def draw(self): """Draw the DOT format.""" @@ -457,16 +548,18 @@ class DotParser: alias = 'N' + v_id if v_id.isnumeric() else v_id node_alias[v_id] = alias - print('digraph G {') - print('node [style=filled];') - print('rankdir = TB;') + self.out.write('digraph G {\n') + self.out.write('node [style=filled];\n') + self.out.write('rankdir = TB;\n\n') self.draw_vertices(node_alias) - print('\n') + self.out.write('\n') self.draw_edges(node_alias) if self.verbosity > 1: - print('\n') + self.out.write('\n\n') self.draw_dependency_edges(node_alias) - print('}') + if self.cluster_fragment: + self.draw_fragment_cluster() + self.out.write('}\n') def parse_options(self, line): """Parse query option section. @@ -491,7 +584,7 @@ class DotParser: if m: self.parse_node(line, m.group(1)) return - if self.verbosity > 0: + if self.verbosity > 0 and not self.no_rf: m = RE_RF.match(line) if m: self.parse_rf(m.group(1)) @@ -513,6 +606,9 @@ class DotParser: This section begins with 'ExecSummary:' line in query profile.""" self.exec_summ_ct += 1 if self.exec_summ_ct <= 2: + # IMPALA-4618 separate column "#Hosts" and "#Inst". + # Verify if we have column "#Inst" or not. + self.exec_summ_has_inst |= (line.find("#Inst") >= 0) return parts = list( @@ -520,13 +616,16 @@ class DotParser: filter(None, line.strip().replace("|", " ").replace("--", " ").split(" ")))) + if not self.exec_summ_has_inst: + parts.insert(1, parts[1]) if len(parts) == 7: + # some operator might not have "#Rows" and "Est. #Rows" columns. parts.insert(5, "-") parts.insert(6, "-") - assert len(parts) >= 9 if len(parts) == 9: + # some operator might not have "Detail". parts.append("-") - assert len(parts) == 10 + assert len(parts) == 10, parts # split id and name from parts[0]. tok = parts[0].split(':') @@ -541,20 +640,29 @@ class DotParser: def parse_filter_table(self, line): """Parse the filter table section. This section begins with 'Final filter table:' line in query profile.""" - self.filter_table_ct += 1 - if self.filter_table_ct == 1: + if RE_RF_TBL_HEADER.match(line): + self.filter_table_ct = 0 + + if self.filter_table_ct >= 0: + self.filter_table_ct += 1 + + if self.filter_table_ct < 0 or self.filter_table_ct == 2: + return + elif self.filter_table_ct == 1: # Tokenize the table header to get the column boundaries headers = list( map(lambda x: x.strip(), filter(None, line.strip().split(" ")))) + debug(headers) begin = 0 for head in headers: end = line.find(head) + len(head) self.filter_table_range.append((begin, end)) begin = end - assert len(self.filter_table_range) >= 11 - - if self.filter_table_ct <= 2: + for i in range(0, FILTER_COL_COUNT - len(headers)): + self.filter_table_range.append((end, end)) + assert len(self.filter_table_range) >= FILTER_COL_COUNT, self.filter_table_range + debug(self.filter_table_range) return parts = [] @@ -566,10 +674,15 @@ class DotParser: # test to parse the first column into int. If fails, it means we arrived at the # end of final filter table. if not tok.isnumeric(): + self.filter_table_ct = -1 return - parts.append(line[begin:end].strip()) + if begin == end: + # This is a missing column. Append nothing to parts. + parts.append('') + else: + parts.append(line[begin:end].strip()) parts[0] = 'RF{:0>3}'.format(parts[0]) - ft = FilterTable._make(parts[0:11]) + ft = FilterTable._make(parts[0:FILTER_COL_COUNT]) debug(ft) self.filter_table_map[ft.id] = ft @@ -596,9 +709,9 @@ class DotParser: for line in inf: if i > len(SECTIONS) - 1: break - if (line.rstrip().startswith(SECTIONS[i].delimiter) or - (not SECTIONS[i].delimiter and not line.rstrip())): - debug('Found delimiter ' + SECTIONS[i].delimiter + ' in ' + line) + if (line.rstrip().startswith(SECTIONS[i].delimiter) + or (not SECTIONS[i].delimiter and not line.rstrip())): + debug('Found delimiter {} in {}'.format(SECTIONS[i].delimiter, line)) i += 1 continue if SECTIONS[i].task == Task.DISCARD: @@ -617,16 +730,35 @@ class DotParser: def main(): parser = argparse.ArgumentParser( - description="This script reads Impala plain text query profile and outputs the \ + description="This script read Impala plain text query profile and output the \ query plan in GraphViz DOT format.") parser.add_argument("infile", nargs="?", type=argparse.FileType('r'), default=sys.stdin, help="Impala query profile in plain text format. stdin will \ be read if argument is not supplied.") - parser.add_argument("--verbosity", type=int, default=2, choices=range(0, 4), + parser.add_argument('-o', '--output', action='store', + type=argparse.FileType('w'), dest='output', default=sys.stdout, + help="Directs the output to a name of your choice") + parser.add_argument('-v', '--verbosity', type=int, default=2, choices=range(0, 4), help='Verbosity level of produced graph. Default to 2.') + parser.add_argument('-e', '--estimate', default=False, action='store_true', + help="Show estimated rows alongside the returned rows in edge label.") + parser.add_argument('--no-time', default=False, action='store_true', + help="Do not show timing information.") + parser.add_argument('--no-color', default=False, action='store_true', + help="Remove custom coloring.") + parser.add_argument('--no-rf', default=False, action='store_true', + help="Remove runtime filter vertices.") + parser.add_argument('--cluster-fragment', default=False, action='store_true', + help="Group vertices belonging to the same fragment into same cluster.") + parser.add_argument('--under-estimate', default=50, type=int, + help="The ratio between actual vs estimated rows returned to highlight as " + "underestimation. Default to 50, which will highlight query operator that " + "returns more than 50 times of rows than the estimate. " + "Require --estimate=True.") args = parser.parse_args() - dp = DotParser(args.verbosity) + dp = DotParser(args.output, args.verbosity, args.estimate, args.no_time, + args.no_color, args.no_rf, args.cluster_fragment, args.under_estimate) dp.parse(args.infile)
