This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f533225915d78e1a2a3d1e6606cf4f6d064151d2 Author: Riza Suminto <[email protected]> AuthorDate: Sat Nov 9 15:44:24 2024 -0800 IMPALA-13543: single_node_perf_run.py must accept tpcds_partitioned tpcds_partitioned dataset is a fully-partitioned version of tpcds dataset (the latter only partition store_sales table). It does not have the default text format database like tpcds dataset. Instead, it relies on pre-existence of text format tpcds database, which then INSERT OVERWRITE INTO tpcds_partitioned database equivalent. It does not have its own queries set, but instead symlinked to share testdata/workloads/tpcds/queries. It also have slightly different schema from tpcds dataset, namely column "c_last_review_date" in tpcds dataset is "c_last_review_date_sk" in tpcds_partitioned (TPC-DS v2.11.0, section 2.4.7). These reasons make tpcds_partitioned ineligible for perf-AB-test (single_node_perf_run.py). This patch update single_node_perf_run.py and related scripts to make tpcds_partitioned eligible for benchmark dataset. It adds an initial steps to load the text database from tpcds dataset with selected scale before running the load script for tpcds_partitioned dataset. Compute stats step also limited to run one at a time to not overadmit the cluster with concurrent compute stats queries. Created helper function build_replacement_params() inside generate-schema-statements.py for common function. Testing - Run perf-AB-test-ub2004 with this commit included and confirm benchmark works with tpcds_partitioned dataset. - Run normal data loading. Pass FE tests, and query_test/test_tpcds_queries.py. Change-Id: I4b6f435705dcf873696ffd151052ebeab35d9898 Reviewed-on: http://gerrit.cloudera.org:8080/22061 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/single_node_perf_run.py | 28 ++- testdata/bin/generate-schema-statements.py | 208 +++++++++++++-------- .../tpcds_partitioned_schema_template.sql | 48 ++--- tests/util/test_file_parser.py | 11 +- 4 files changed, 184 insertions(+), 111 deletions(-) diff --git a/bin/single_node_perf_run.py b/bin/single_node_perf_run.py index 4255ac7b4..4f77977df 100755 --- a/bin/single_node_perf_run.py +++ b/bin/single_node_perf_run.py @@ -88,6 +88,7 @@ import textwrap from tests.common.test_dimensions import TableFormatInfo IMPALA_HOME = os.environ["IMPALA_HOME"] +IMPALA_PERF_RESULTS = os.path.join(IMPALA_HOME, "perf_results") def configured_call(cmd): @@ -100,14 +101,17 @@ def configured_call(cmd): def load_data(db_to_load, table_formats, scale): """Loads a database with a particular scale factor.""" + all_formats = ("text/none," + table_formats if "text/none" not in table_formats + else table_formats) configured_call(["{0}/bin/load-data.py".format(IMPALA_HOME), "--workloads", db_to_load, "--scale_factor", str(scale), - "--table_formats", "text/none," + table_formats]) + "--table_formats", all_formats]) for table_format in table_formats.split(","): suffix = TableFormatInfo.create_from_string(None, table_format).db_suffix() db_name = db_to_load + scale + suffix configured_call(["{0}/tests/util/compute_table_stats.py".format(IMPALA_HOME), - "--stop_on_error", "--db_names", db_name]) + "--stop_on_error", "--db_names", db_name, + "--parallelism", "1"]) def get_git_hash_for_name(name): @@ -164,7 +168,7 @@ def run_workload(base_dir, workloads, options): def report_benchmark_results(file_a, file_b, description): """Wrapper around report_benchmark_result.py.""" - result = "{0}/perf_results/latest/performance_result.txt".format(IMPALA_HOME) + result = os.path.join(IMPALA_PERF_RESULTS, "latest", "performance_result.txt") with open(result, "w") as f: subprocess.check_call( ["{0}/tests/benchmark/report_benchmark_results.py".format(IMPALA_HOME), @@ -266,7 +270,7 @@ def perf_ab_test(options, args): hash_a = get_git_hash_for_name(args[0]) # Create the base directory to store the results in - results_path = os.path.join(IMPALA_HOME, "perf_results") + results_path = IMPALA_PERF_RESULTS if not os.access(results_path, os.W_OK): os.makedirs(results_path) @@ -284,12 +288,20 @@ def perf_ab_test(options, args): start_minicluster() start_impala(options.num_impalads, options) - workloads = set(options.workloads.split(",")) + workloads = options.workloads.split(",") if options.load: - WORKLOAD_TO_DATASET = {"tpch": "tpch", "tpcds": "tpcds", "targeted-perf": "tpch", - "tpcds-unmodified": "tpcds-unmodified"} - datasets = set([WORKLOAD_TO_DATASET[workload] for workload in workloads]) + WORKLOAD_TO_DATASET = { + "tpch": "tpch", + "tpcds": "tpcds", + "targeted-perf": "tpch", + "tpcds-unmodified": "tpcds-unmodified", + "tpcds_partitioned": "tpcds_partitioned" + } + datasets = [WORKLOAD_TO_DATASET[workload] for workload in workloads] + if "tpcds_partitioned" in datasets and "tpcds" not in datasets: + # "tpcds_partitioned" require the text "tpcds" database. + load_data("tpcds", "text/none", options.scale) for dataset in datasets: load_data(dataset, options.table_formats, options.scale) diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index 2361fd2d6..195094c49 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -112,7 +112,7 @@ from tests.common.test_dimensions import ( parser = OptionParser() parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy", - default="core", help="The exploration strategy for schema gen: 'core',"\ + default="core", help="The exploration strategy for schema gen: 'core'," " 'pairwise', or 'exhaustive'") parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir", default="/test-warehouse", @@ -122,16 +122,16 @@ parser.add_option("-w", "--workload", dest="workload", parser.add_option("-s", "--scale_factor", dest="scale_factor", default="", help="An optional scale factor to generate the schema for") parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true", - default= False, help='Skips HDFS exists check and reloads all tables') + default=False, help='Skips HDFS exists check and reloads all tables') parser.add_option("-v", "--verbose", dest="verbose", action="store_true", - default = False, help="If set, outputs additional logging.") + default=False, help="If set, outputs additional logging.") parser.add_option("-b", "--backend", dest="backend", default="localhost:21000", help="Backend connection to use, default: localhost:21000") parser.add_option("--table_names", dest="table_names", default=None, - help="Only load the specified tables - specified as a comma-seperated "\ + help="Only load the specified tables - specified as a comma-seperated " "list of base table names") parser.add_option("--table_formats", dest="table_formats", default=None, - help="Override the test vectors and load using the specified table "\ + help="Override the test vectors and load using the specified table " "formats. Ex. --table_formats=seq/snap/block,text/none") parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500", help="HDFS name node for Avro schema URLs, default localhost:20500") @@ -142,11 +142,12 @@ if options.workload is None: parser.print_help() sys.exit(1) -WORKLOAD_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'workloads') -DATASET_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'datasets') +IMPALA_HOME = os.getenv("IMPALA_HOME") +WORKLOAD_DIR = os.path.join(IMPALA_HOME, 'testdata', 'workloads') +DATASET_DIR = os.path.join(IMPALA_HOME, 'testdata', 'datasets') SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR'] AVRO_SCHEMA_DIR = "avro_schemas" -DEFAULT_FS=os.environ['DEFAULT_FS'] +DEFAULT_FS = os.environ['DEFAULT_FS'] IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu'] IMPALA_PARQUET_COMPRESSION_MAP = \ @@ -178,18 +179,19 @@ DATASET_IDX = 1 CODEC_IDX = 2 COMPRESSION_TYPE_IDX = 3 -COMPRESSION_MAP = {'def': 'org.apache.hadoop.io.compress.DefaultCodec', - 'gzip': 'org.apache.hadoop.io.compress.GzipCodec', - 'bzip': 'org.apache.hadoop.io.compress.BZip2Codec', - 'snap': 'org.apache.hadoop.io.compress.SnappyCodec', - 'none': '' - } +COMPRESSION_MAP = { + 'def': 'org.apache.hadoop.io.compress.DefaultCodec', + 'gzip': 'org.apache.hadoop.io.compress.GzipCodec', + 'bzip': 'org.apache.hadoop.io.compress.BZip2Codec', + 'snap': 'org.apache.hadoop.io.compress.SnappyCodec', + 'none': '' +} AVRO_COMPRESSION_MAP = { 'def': 'deflate', 'snap': 'snappy', 'none': '', - } +} HIVE_TO_AVRO_TYPE_MAP = { 'STRING': 'string', @@ -206,7 +208,7 @@ HIVE_TO_AVRO_TYPE_MAP = { # in our tests. 'TIMESTAMP': 'string', 'BINARY': 'bytes', - } +} PARQUET_ALTER_STATEMENT = "ALTER TABLE %(table_name)s SET\n\ SERDEPROPERTIES ('blocksize' = '1073741824', 'compression' = '%(compression)s');" @@ -230,9 +232,47 @@ PARTITIONED_INSERT_RE = re.compile( HINT_SHUFFLE = "/* +shuffle, clustered */" +# Variable names in the sql statement will be replaced by the actual values. +# Use build_replacement_params() to create such variable:value map. +VAR_DB_NAME = "db_name" +VAR_DB_SUFFIX = "db_suffix" +VAR_TABLE_NAME = "table_name" +VAR_HDFS_LOCATION = "hdfs_location" +VAR_IMPALA_HOME = "impala_home" +VAR_HINT = "hint" +VAR_PART_PREDICATE = "part_predicate" +VAR_FILE_FORMAT = "file_format" +VAR_SCALE_FACTOR = "scale_factor" + + +def build_replacement_params( + db_name, db_suffix, table_name, hdfs_location=None, impala_home=None, + hint=None, part_predicate=None, file_format=None, scale_factor=None): + """Create a variable:value map for SQL replacement based on given parameters. + db_name, db_suffix, and table_name is mandatory. Others are optional.""" + params = { + VAR_DB_NAME: db_name, + VAR_DB_SUFFIX: db_suffix, + VAR_TABLE_NAME: table_name + } + if hdfs_location is not None: + params[VAR_HDFS_LOCATION] = hdfs_location + if impala_home is not None: + params[VAR_IMPALA_HOME] = impala_home + if hint is not None: + params[VAR_HINT] = hint + if part_predicate is not None: + params[VAR_PART_PREDICATE] = part_predicate + if file_format is not None: + params[VAR_FILE_FORMAT] = file_format + if scale_factor is not None: + params[VAR_SCALE_FACTOR] = scale_factor + return params + + def build_create_statement(table_template, table_name, db_name, db_suffix, - file_format, compression, hdfs_location, - force_reload, is_hive_stmt): + file_format, hdfs_location, force_reload, is_hive_stmt, + scale_factor=""): create_stmt = '' if (force_reload): tbl_type = 'TABLE' @@ -247,12 +287,14 @@ def build_create_statement(table_template, table_name, db_name, db_suffix, # Remove location part from the format string table_template = table_template.replace("LOCATION '{hdfs_location}'", "") - stmt = table_template.format( - db_name=db_name, - db_suffix=db_suffix, - table_name=table_name, + params = build_replacement_params( + db_name, + db_suffix, + table_name, file_format=FILE_FORMAT_TO_STORED_AS_MAP[file_format], - hdfs_location=hdfs_location) + hdfs_location=hdfs_location, + scale_factor=scale_factor) + stmt = table_template.format(**params) # Apache Hive 3.1 doesn't support "STORED BY ICEBERG STORED AS AVRO" and # "STORED AS JSONFILE" (HIVE-25162, HIVE-19899) if is_hive_stmt and os.environ['USE_APACHE_HIVE'] == "true": @@ -299,9 +341,9 @@ def parse_table_properties(file_format, table_properties): tblproperties = {} TABLE_PROPERTY_RE = re.compile( # Optional '<data-format>:' prefix, capturing just the 'data-format' part. - r'(?:(\w+):)?' + + r'(?:(\w+):)?' # Required key=value, capturing the key and value - r'(.+?)=(.*)') + + r'(.+?)=(.*)') for table_property in [_f for _f in table_properties.split("\n") if _f]: m = TABLE_PROPERTY_RE.match(table_property) if not m: @@ -408,13 +450,14 @@ LOCATION '{{hdfs_location}}' partitioned_by=partitioned_by, tblproperties=tblproperties_clause, file_format_string=file_format_string - ).strip() + ).strip() # Remove empty lines from the stmt string. There is an empty line for # each of the sections that didn't have anything (e.g. partitioned_by) stmt = os.linesep.join([s for s in stmt.splitlines() if s]) stmt += ';' return stmt + def build_hbase_create_stmt_in_hive(columns, partition_columns, table_name): # The hbase create statement differs sufficiently from the generic create to justify a # separate method. Specifically, STORED AS becomes STORED BY. There is section called @@ -439,15 +482,16 @@ def build_hbase_create_stmt_in_hive(columns, partition_columns, table_name): columns=',\n'.join(columns), hbase_column_mapping=hbase_column_mapping, tbl_properties=tbl_properties, - ).strip() + ).strip() return stmt + ';' + def avro_schema(columns): record = { - "name": "a", # doesn't matter + "name": "a", # doesn't matter "type": "record", "fields": list() - } + } for column_spec in columns.strip().split('\n'): # column_spec looks something like "col_name col_type COMMENT comment" # (comment may be omitted, we don't use it) @@ -460,7 +504,7 @@ def avro_schema(columns): precision = 9 else: # Parse out scale and precision from decimal type - m = re.search("DECIMAL\((?P<precision>.*),(?P<scale>.*)\)", column_spec.upper()) + m = re.search(r"DECIMAL\((?P<precision>.*),(?P<scale>.*)\)", column_spec.upper()) assert m, "Could not parse decimal column spec: " + column_spec scale = int(m.group('scale')) precision = int(m.group('precision')) @@ -477,10 +521,11 @@ def avro_schema(columns): record["fields"].append( {'name': name, - 'type': [type, "null"]}) # all columns nullable + 'type': [type, "null"]}) # all columns nullable return json.dumps(record) + def build_compression_codec_statement(codec, compression_type, file_format): codec = (AVRO_COMPRESSION_MAP if file_format == 'avro' else COMPRESSION_MAP).get(codec) if not codec: @@ -488,6 +533,7 @@ def build_compression_codec_statement(codec, compression_type, file_format): return (AVRO_COMPRESSION_CODEC % codec) if file_format == 'avro' else ( COMPRESSION_TYPE % compression_type.upper() + '\n' + COMPRESSION_CODEC % codec) + def build_codec_enabled_statement(codec): compression_enabled = 'false' if codec == 'none' else 'true' return COMPRESSION_ENABLED % compression_enabled @@ -505,25 +551,25 @@ def build_partitioned_load(insert, re_match, can_hint, params): batch = int(re_match.group(4)) insert = PARTITIONED_INSERT_RE.sub("", insert) statements = [] - params["hint"] = HINT_SHUFFLE if can_hint else '' + params[VAR_HINT] = HINT_SHUFFLE if can_hint else '' first_part = min_val while first_part < max_val: if first_part + batch >= max_val: # This is the last batch if first_part == min_val: # There is only 1 batch in this insert. No predicate needed. - params["part_predicate"] = '' + params[VAR_PART_PREDICATE] = '' else: # Insert the remaining partitions + NULL partition - params["part_predicate"] = "WHERE {0} <= {1} OR {2} IS NULL".format( + params[VAR_PART_PREDICATE] = "WHERE {0} <= {1} OR {2} IS NULL".format( first_part, part_col, part_col) elif first_part == min_val: # This is the first batch. - params["part_predicate"] = "WHERE {0} < {1}".format( + params[VAR_PART_PREDICATE] = "WHERE {0} < {1}".format( part_col, first_part + batch) else: # This is the middle batch. - params["part_predicate"] = "WHERE {0} <= {1} AND {2} < {3}".format( + params[VAR_PART_PREDICATE] = "WHERE {0} <= {1} AND {2} < {3}".format( first_part, part_col, part_col, first_part + batch) statements.append(insert.format(**params)) first_part += batch @@ -531,24 +577,24 @@ def build_partitioned_load(insert, re_match, can_hint, params): def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format, - hdfs_path, for_impala=False): + hdfs_path, for_impala=False, scale_factor=""): can_hint = for_impala and (file_format == 'parquet' or is_iceberg_table(file_format)) - params = { - "db_name": db_name, - "db_suffix": db_suffix, - "table_name": table_name, - "hdfs_location": hdfs_path, - "impala_home": os.getenv("IMPALA_HOME"), - "hint": "", - "part_predicate": "" - } + params = build_replacement_params( + db_name, + db_suffix, + table_name, + hdfs_location=hdfs_path, + impala_home=IMPALA_HOME, + hint="", + part_predicate="", + scale_factor=scale_factor) m = PARTITIONED_INSERT_RE.search(insert) if m: insert_statement = build_partitioned_load(insert, m, can_hint, params) else: if can_hint: - params["hint"] = HINT_SHUFFLE + params[VAR_HINT] = HINT_SHUFFLE insert_statement = insert.format(**params) # Kudu tables are managed and don't support OVERWRITE, so we replace OVERWRITE @@ -573,20 +619,22 @@ def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_for statement += SET_HIVE_INPUT_FORMAT % "HiveInputFormat" return statement + insert_statement + def build_hbase_insert(db_name, db_suffix, table_name): hbase_insert = SET_HIVE_HBASE_BULK_LOAD + ';\n' # For Apache Hive, "hive.hbase.bulk does not exist" exception will be thrown and there # is only warning in cdp if os.environ['USE_APACHE_HIVE'] == "true": hbase_insert = "" + params = build_replacement_params(db_name, db_suffix, table_name) hbase_insert += ("INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}" - " SELECT * FROM {db_name}.{table_name};\n").\ - format(db_name=db_name, db_suffix=db_suffix,table_name=table_name) + " SELECT * FROM {db_name}.{table_name};\n").format(**params) return hbase_insert + def build_insert(insert, db_name, db_suffix, file_format, codec, compression_type, table_name, hdfs_path, create_hive=False, - for_impala=False): + for_impala=False, scale_factor=""): # HBASE inserts don't need the hive options to be set, and don't require and HDFS # file location, so they're handled separately. if file_format == 'hbase' and not create_hive: @@ -602,26 +650,25 @@ def build_insert(insert, db_name, db_suffix, file_format, output += build_impala_parquet_codec_statement(codec) + "\n" output += build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format, hdfs_path, - for_impala) + "\n" + for_impala, scale_factor) + "\n" return output -def build_load_statement(load_template, db_name, db_suffix, table_name): +def build_load_statement(load_template, db_name, db_suffix, table_name, scale_factor=""): # hbase does not need the hdfs path. if table_name.startswith('hbase'): - load_template = load_template.format(table_name=table_name, - db_name=db_name, - db_suffix=db_suffix) + params = build_replacement_params(db_name, db_suffix, table_name) + load_template = load_template.format(**params) else: - base_load_dir = os.getenv("REMOTE_LOAD", os.getenv("IMPALA_HOME")) - params = { - "db_name": db_name, - "db_suffix": db_suffix, - "table_name": table_name, - "impala_home": base_load_dir, - "hint": "", - "part_predicate": "" - } + base_load_dir = os.getenv("REMOTE_LOAD", IMPALA_HOME) + params = build_replacement_params( + db_name, + db_suffix, + table_name, + impala_home=base_load_dir, + hint="", + part_predicate="", + scale_factor=scale_factor) m = PARTITIONED_INSERT_RE.search(load_template) if m: @@ -646,11 +693,12 @@ def build_hbase_create_stmt(db_name, table_name, column_families, region_splits) create_stmts.append(create_statement) return create_stmts + # Does a hdfs directory listing and returns array with all the subdir names. def get_hdfs_subdirs_with_data(path): tmp_file = tempfile.TemporaryFile("w+") cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path - subprocess.call([cmd], shell = True, stderr = open('/dev/null'), stdout = tmp_file) + subprocess.call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file) tmp_file.seek(0) # Results look like: @@ -658,6 +706,7 @@ def get_hdfs_subdirs_with_data(path): # So to get subdirectory names just return everything after the last '/' return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()] + class Statements(object): """Simple container object for storing SQL statements to be output to a file. Useful for ordering the statements correctly.""" @@ -676,6 +725,7 @@ class Statements(object): def __bool__(self): return bool(self.create or self.load or self.load_base) + def eval_section(section_str): """section_str should be the contents of a section (i.e. a string). If section_str starts with `, evaluates section_str as a shell command and returns the @@ -690,6 +740,7 @@ def eval_section(section_str): assert p.returncode == 0 return stdout.strip() + def generate_statements(output_name, test_vectors, sections, schema_include_constraints, schema_exclude_constraints, schema_only_constraints, convert_orc_to_full_acid): @@ -801,10 +852,10 @@ def generate_statements(output_name, test_vectors, sections, tblproperties = parse_table_properties(create_file_format, table_properties) # ORC tables are full ACID by default. - if (convert_orc_to_full_acid and - HIVE_MAJOR_VERSION == 3 and - create_file_format == 'orc' and - 'transactional' not in tblproperties): + if (convert_orc_to_full_acid + and HIVE_MAJOR_VERSION == 3 + and create_file_format == 'orc' + and 'transactional' not in tblproperties): tblproperties['transactional'] = 'true' if create_file_format == 'orc' and create_codec != 'def': # The default value of 'orc.compress' is ZLIB which corresponds to 'def'. @@ -861,15 +912,15 @@ def generate_statements(output_name, test_vectors, sections, if file_format == 'avro': if not os.path.exists(avro_schema_dir): os.makedirs(avro_schema_dir) - with open("%s/%s.json" % (avro_schema_dir, table_name),"w") as f: + with open("%s/%s.json" % (avro_schema_dir, table_name), "w") as f: f.write(avro_schema(columns)) else: table_template = None if table_template: output.create.append(build_create_statement(table_template, table_name, db_name, - db_suffix, create_file_format, create_codec, data_path, force_reload, - create_hive)) + db_suffix, create_file_format, data_path, force_reload, create_hive, + options.scale_factor)) # HBASE create table if file_format == 'hbase': # If the HBASE_COLUMN_FAMILIES section does not exist, default to 'd' @@ -907,18 +958,20 @@ def generate_statements(output_name, test_vectors, sections, load_from_json_file = file_format == 'json' and table_name.endswith('_json') if not db_suffix or load_from_json_file: if load: - hive_output.load_base.append(build_load_statement(load, db_name, - db_suffix, table_name)) + hive_output.load_base.append( + build_load_statement(load, db_name, db_suffix, table_name, + options.scale_factor)) else: print('Empty base table load for %s. Skipping load generation' % table_name) elif file_format in ['kudu', 'parquet', 'iceberg']: if insert_hive: hive_output.load.append(build_insert(insert_hive, db_name, db_suffix, - file_format, codec, compression_type, table_name, data_path)) + file_format, codec, compression_type, table_name, data_path, + for_impala=False, scale_factor=options.scale_factor)) elif insert: impala_load.load.append(build_insert(insert, db_name, db_suffix, file_format, codec, compression_type, table_name, data_path, - for_impala=True)) + for_impala=True, scale_factor=options.scale_factor)) else: print('Empty parquet/kudu load for table %s. Skipping insert generation' % table_name) @@ -961,6 +1014,7 @@ def parse_schema_template_file(file_name): 'TABLE_PROPERTIES', 'HBASE_REGION_SPLITS', 'HIVE_MAJOR_VERSION'] return parse_test_file(file_name, VALID_SECTION_NAMES, skip_unknown_sections=False) + if __name__ == "__main__": if options.table_formats is None: if options.exploration_strategy not in KNOWN_EXPLORATION_STRATEGIES: @@ -968,7 +1022,7 @@ if __name__ == "__main__": print('Valid values:', ', '.join(KNOWN_EXPLORATION_STRATEGIES)) sys.exit(1) - test_vectors = [vector.value for vector in\ + test_vectors = [vector.value for vector in load_table_info_dimension(options.workload, options.exploration_strategy)] else: table_formats = options.table_formats.split(',') diff --git a/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql b/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql index a7ceecda1..6c6b436ff 100644 --- a/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql +++ b/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql @@ -35,7 +35,7 @@ t_meal_time string primary key (t_time_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -73,7 +73,7 @@ d_current_year string primary key (d_date_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -86,7 +86,7 @@ r_reason_desc string primary key (r_reason_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -109,7 +109,7 @@ ca_location_type string primary key (ca_address_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -122,7 +122,7 @@ ib_upper_bound int primary key (ib_income_band_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -138,7 +138,7 @@ primary key (hd_demo_sk) DISABLE NOVALIDATE RELY foreign key (hd_income_band_sk) references {db_name}{db_suffix}.income_band (ib_income_band_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -157,7 +157,7 @@ cd_dep_college_count int primary key (cd_demo_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -173,7 +173,7 @@ sm_contract string primary key (sm_ship_mode_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -252,7 +252,7 @@ SELECT i_container, i_manager_id, i_product_name -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -276,7 +276,7 @@ w_gmt_offset decimal(5,2) primary key (w_warehouse_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -319,7 +319,7 @@ foreign key (cc_closed_date_sk) references {db_name}{db_suffix}.date_dim (d_date foreign key (cc_open_date_sk) references {db_name}{db_suffix}.date_dim (d_date_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -392,7 +392,7 @@ SELECT c_login, c_email_address, CAST(c_last_review_date AS INT) -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -465,7 +465,7 @@ SELECT p_channel_details, p_purpose, p_discount_active -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -505,7 +505,7 @@ primary key (s_store_sk) DISABLE NOVALIDATE RELY foreign key (s_closed_date_sk) references {db_name}{db_suffix}.date_dim (d_date_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -526,7 +526,7 @@ foreign key (cp_start_date_sk) references {db_name}{db_suffix}.date_dim (d_date_ foreign key (cp_end_date_sk) references {db_name}{db_suffix}.date_dim (d_date_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -553,7 +553,7 @@ foreign key (wp_access_date_sk) references {db_name}{db_suffix}.date_dim (d_date foreign key (wp_customer_sk) references {db_name}{db_suffix}.customer (c_customer_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -591,7 +591,7 @@ foreign key (web_open_date_sk) references {db_name}{db_suffix}.date_dim (d_date_ foreign key (web_close_date_sk) references {db_name}{db_suffix}.date_dim (d_date_sk) DISABLE NOVALIDATE RELY ---- DEPENDENT_LOAD INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} -SELECT * FROM tpcds.{table_name}; +SELECT * FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -629,7 +629,7 @@ SELECT inv_warehouse_sk, inv_quantity_on_hand, inv_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -762,7 +762,7 @@ SELECT cs_net_paid_inc_ship_tax, cs_net_profit, cs_sold_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -874,7 +874,7 @@ SELECT cr_store_credit, cr_net_loss, cr_returned_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -966,7 +966,7 @@ SELECT ss_net_paid_inc_tax, ss_net_profit, ss_sold_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -1050,7 +1050,7 @@ SELECT sr_store_credit, sr_net_loss, sr_returned_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -1183,7 +1183,7 @@ SELECT ws_net_paid_inc_ship_tax, ws_net_profit, ws_sold_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== ---- DATASET tpcds_partitioned @@ -1283,5 +1283,5 @@ SELECT wr_account_credit, wr_net_loss, wr_returned_date_sk -FROM tpcds.{table_name}; +FROM tpcds{scale_factor}.{table_name}; ==== diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py index 754854f49..d0136a9cf 100644 --- a/tests/util/test_file_parser.py +++ b/tests/util/test_file_parser.py @@ -394,7 +394,7 @@ def load_tpc_queries(workload, include_stress_queries=False, query_name_filters= # find. Both workload directories contain other queries that are not part of the TPC # spec. file_workload = workload - if workload == "tpcds": + if workload in ["tpcds", "tpcds_partitioned"]: # TPCDS is assumed to always use decimal_v2, which is the default since 3.0 file_workload = "tpcds-decimal_v2" if include_stress_queries: @@ -402,7 +402,11 @@ def load_tpc_queries(workload, include_stress_queries=False, query_name_filters= else: file_name_pattern = re.compile(r"^{0}-(q.*).test$".format(file_workload)) - query_name_pattern = re.compile(r"^{0}-(.*)$".format(workload.upper())) + query_name_prefix = workload.upper() + if workload == "tpcds_partitioned": + # 'tpcds_partitioned' symlink to the same queries set of 'tpcds'. + query_name_prefix = "TPCDS" + query_name_pattern = re.compile(r"^{0}-(.*)$".format(query_name_prefix)) if workload == "tpch_nested": query_name_pattern = re.compile(r"^TPCH-(.*)$") @@ -426,6 +430,9 @@ def load_tpc_queries(workload, include_stress_queries=False, query_name_filters= test_cases = parse_query_test_file(file_path) for test_case in test_cases: query_sql = remove_comments(test_case["QUERY"]) + if workload == "tpcds_partitioned": + # replace old columns names from old TPC-DS spec with a new one. + query_sql = query_sql.replace("c_last_review_date", "c_last_review_date_sk") if re.match(filter_regex, test_case["QUERY_NAME"]): query_name_match = query_name_pattern.search(test_case["QUERY_NAME"])
