This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f533225915d78e1a2a3d1e6606cf4f6d064151d2
Author: Riza Suminto <[email protected]>
AuthorDate: Sat Nov 9 15:44:24 2024 -0800

    IMPALA-13543: single_node_perf_run.py must accept tpcds_partitioned
    
    tpcds_partitioned dataset is a fully-partitioned version of tpcds
    dataset (the latter only partition store_sales table). It does not have
    the default text format database like tpcds dataset. Instead, it relies
    on pre-existence of text format tpcds database, which then INSERT
    OVERWRITE INTO tpcds_partitioned database equivalent. It does not have
    its own queries set, but instead symlinked to share
    testdata/workloads/tpcds/queries. It also have slightly different schema
    from tpcds dataset, namely column "c_last_review_date" in tpcds dataset
    is "c_last_review_date_sk" in tpcds_partitioned (TPC-DS v2.11.0, section
    2.4.7). These reasons make tpcds_partitioned ineligible for
    perf-AB-test (single_node_perf_run.py).
    
    This patch update single_node_perf_run.py and related scripts to make
    tpcds_partitioned eligible for benchmark dataset. It adds an initial
    steps to load the text database from tpcds dataset with selected scale
    before running the load script for tpcds_partitioned dataset. Compute
    stats step also limited to run one at a time to not overadmit the
    cluster with concurrent compute stats queries.
    
    Created helper function build_replacement_params() inside
    generate-schema-statements.py for common function.
    
    Testing
    - Run perf-AB-test-ub2004 with this commit included and confirm
      benchmark works with tpcds_partitioned dataset.
    - Run normal data loading. Pass FE tests, and
      query_test/test_tpcds_queries.py.
    
    Change-Id: I4b6f435705dcf873696ffd151052ebeab35d9898
    Reviewed-on: http://gerrit.cloudera.org:8080/22061
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/single_node_perf_run.py                        |  28 ++-
 testdata/bin/generate-schema-statements.py         | 208 +++++++++++++--------
 .../tpcds_partitioned_schema_template.sql          |  48 ++---
 tests/util/test_file_parser.py                     |  11 +-
 4 files changed, 184 insertions(+), 111 deletions(-)

diff --git a/bin/single_node_perf_run.py b/bin/single_node_perf_run.py
index 4255ac7b4..4f77977df 100755
--- a/bin/single_node_perf_run.py
+++ b/bin/single_node_perf_run.py
@@ -88,6 +88,7 @@ import textwrap
 from tests.common.test_dimensions import TableFormatInfo
 
 IMPALA_HOME = os.environ["IMPALA_HOME"]
+IMPALA_PERF_RESULTS = os.path.join(IMPALA_HOME, "perf_results")
 
 
 def configured_call(cmd):
@@ -100,14 +101,17 @@ def configured_call(cmd):
 
 def load_data(db_to_load, table_formats, scale):
   """Loads a database with a particular scale factor."""
+  all_formats = ("text/none," + table_formats if "text/none" not in 
table_formats
+                 else table_formats)
   configured_call(["{0}/bin/load-data.py".format(IMPALA_HOME),
                    "--workloads", db_to_load, "--scale_factor", str(scale),
-                   "--table_formats", "text/none," + table_formats])
+                   "--table_formats", all_formats])
   for table_format in table_formats.split(","):
     suffix = TableFormatInfo.create_from_string(None, table_format).db_suffix()
     db_name = db_to_load + scale + suffix
     
configured_call(["{0}/tests/util/compute_table_stats.py".format(IMPALA_HOME),
-                     "--stop_on_error", "--db_names", db_name])
+                     "--stop_on_error", "--db_names", db_name,
+                     "--parallelism", "1"])
 
 
 def get_git_hash_for_name(name):
@@ -164,7 +168,7 @@ def run_workload(base_dir, workloads, options):
 
 def report_benchmark_results(file_a, file_b, description):
   """Wrapper around report_benchmark_result.py."""
-  result = "{0}/perf_results/latest/performance_result.txt".format(IMPALA_HOME)
+  result = os.path.join(IMPALA_PERF_RESULTS, "latest", 
"performance_result.txt")
   with open(result, "w") as f:
     subprocess.check_call(
       ["{0}/tests/benchmark/report_benchmark_results.py".format(IMPALA_HOME),
@@ -266,7 +270,7 @@ def perf_ab_test(options, args):
   hash_a = get_git_hash_for_name(args[0])
 
   # Create the base directory to store the results in
-  results_path = os.path.join(IMPALA_HOME, "perf_results")
+  results_path = IMPALA_PERF_RESULTS
   if not os.access(results_path, os.W_OK):
     os.makedirs(results_path)
 
@@ -284,12 +288,20 @@ def perf_ab_test(options, args):
     start_minicluster()
   start_impala(options.num_impalads, options)
 
-  workloads = set(options.workloads.split(","))
+  workloads = options.workloads.split(",")
 
   if options.load:
-    WORKLOAD_TO_DATASET = {"tpch": "tpch", "tpcds": "tpcds", "targeted-perf": 
"tpch",
-                           "tpcds-unmodified": "tpcds-unmodified"}
-    datasets = set([WORKLOAD_TO_DATASET[workload] for workload in workloads])
+    WORKLOAD_TO_DATASET = {
+      "tpch": "tpch",
+      "tpcds": "tpcds",
+      "targeted-perf": "tpch",
+      "tpcds-unmodified": "tpcds-unmodified",
+      "tpcds_partitioned": "tpcds_partitioned"
+    }
+    datasets = [WORKLOAD_TO_DATASET[workload] for workload in workloads]
+    if "tpcds_partitioned" in datasets and "tpcds" not in datasets:
+      # "tpcds_partitioned" require the text "tpcds" database.
+      load_data("tpcds", "text/none", options.scale)
     for dataset in datasets:
       load_data(dataset, options.table_formats, options.scale)
 
diff --git a/testdata/bin/generate-schema-statements.py 
b/testdata/bin/generate-schema-statements.py
index 2361fd2d6..195094c49 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -112,7 +112,7 @@ from tests.common.test_dimensions import (
 
 parser = OptionParser()
 parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
-                  default="core", help="The exploration strategy for schema 
gen: 'core',"\
+                  default="core", help="The exploration strategy for schema 
gen: 'core',"
                   " 'pairwise', or 'exhaustive'")
 parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir",
                   default="/test-warehouse",
@@ -122,16 +122,16 @@ parser.add_option("-w", "--workload", dest="workload",
 parser.add_option("-s", "--scale_factor", dest="scale_factor", default="",
                   help="An optional scale factor to generate the schema for")
 parser.add_option("-f", "--force_reload", dest="force_reload", 
action="store_true",
-                  default= False, help='Skips HDFS exists check and reloads 
all tables')
+                  default=False, help='Skips HDFS exists check and reloads all 
tables')
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
-                  default = False, help="If set, outputs additional logging.")
+                  default=False, help="If set, outputs additional logging.")
 parser.add_option("-b", "--backend", dest="backend", default="localhost:21000",
                   help="Backend connection to use, default: localhost:21000")
 parser.add_option("--table_names", dest="table_names", default=None,
-                  help="Only load the specified tables - specified as a 
comma-seperated "\
+                  help="Only load the specified tables - specified as a 
comma-seperated "
                   "list of base table names")
 parser.add_option("--table_formats", dest="table_formats", default=None,
-                  help="Override the test vectors and load using the specified 
table "\
+                  help="Override the test vectors and load using the specified 
table "
                   "formats. Ex. --table_formats=seq/snap/block,text/none")
 parser.add_option("--hdfs_namenode", dest="hdfs_namenode", 
default="localhost:20500",
                   help="HDFS name node for Avro schema URLs, default 
localhost:20500")
@@ -142,11 +142,12 @@ if options.workload is None:
   parser.print_help()
   sys.exit(1)
 
-WORKLOAD_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'workloads')
-DATASET_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'datasets')
+IMPALA_HOME = os.getenv("IMPALA_HOME")
+WORKLOAD_DIR = os.path.join(IMPALA_HOME, 'testdata', 'workloads')
+DATASET_DIR = os.path.join(IMPALA_HOME, 'testdata', 'datasets')
 SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR']
 AVRO_SCHEMA_DIR = "avro_schemas"
-DEFAULT_FS=os.environ['DEFAULT_FS']
+DEFAULT_FS = os.environ['DEFAULT_FS']
 IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu']
 
 IMPALA_PARQUET_COMPRESSION_MAP = \
@@ -178,18 +179,19 @@ DATASET_IDX = 1
 CODEC_IDX = 2
 COMPRESSION_TYPE_IDX = 3
 
-COMPRESSION_MAP = {'def': 'org.apache.hadoop.io.compress.DefaultCodec',
-                   'gzip': 'org.apache.hadoop.io.compress.GzipCodec',
-                   'bzip': 'org.apache.hadoop.io.compress.BZip2Codec',
-                   'snap': 'org.apache.hadoop.io.compress.SnappyCodec',
-                   'none': ''
-                  }
+COMPRESSION_MAP = {
+  'def': 'org.apache.hadoop.io.compress.DefaultCodec',
+  'gzip': 'org.apache.hadoop.io.compress.GzipCodec',
+  'bzip': 'org.apache.hadoop.io.compress.BZip2Codec',
+  'snap': 'org.apache.hadoop.io.compress.SnappyCodec',
+  'none': ''
+}
 
 AVRO_COMPRESSION_MAP = {
   'def': 'deflate',
   'snap': 'snappy',
   'none': '',
-  }
+}
 
 HIVE_TO_AVRO_TYPE_MAP = {
   'STRING': 'string',
@@ -206,7 +208,7 @@ HIVE_TO_AVRO_TYPE_MAP = {
   # in our tests.
   'TIMESTAMP': 'string',
   'BINARY': 'bytes',
-  }
+}
 
 PARQUET_ALTER_STATEMENT = "ALTER TABLE %(table_name)s SET\n\
      SERDEPROPERTIES ('blocksize' = '1073741824', 'compression' = 
'%(compression)s');"
@@ -230,9 +232,47 @@ PARTITIONED_INSERT_RE = re.compile(
 
 HINT_SHUFFLE = "/* +shuffle, clustered */"
 
+# Variable names in the sql statement will be replaced by the actual values.
+# Use build_replacement_params() to create such variable:value map.
+VAR_DB_NAME = "db_name"
+VAR_DB_SUFFIX = "db_suffix"
+VAR_TABLE_NAME = "table_name"
+VAR_HDFS_LOCATION = "hdfs_location"
+VAR_IMPALA_HOME = "impala_home"
+VAR_HINT = "hint"
+VAR_PART_PREDICATE = "part_predicate"
+VAR_FILE_FORMAT = "file_format"
+VAR_SCALE_FACTOR = "scale_factor"
+
+
+def build_replacement_params(
+    db_name, db_suffix, table_name, hdfs_location=None, impala_home=None,
+    hint=None, part_predicate=None, file_format=None, scale_factor=None):
+  """Create a variable:value map for SQL replacement based on given parameters.
+  db_name, db_suffix, and table_name is mandatory. Others are optional."""
+  params = {
+    VAR_DB_NAME: db_name,
+    VAR_DB_SUFFIX: db_suffix,
+    VAR_TABLE_NAME: table_name
+  }
+  if hdfs_location is not None:
+    params[VAR_HDFS_LOCATION] = hdfs_location
+  if impala_home is not None:
+    params[VAR_IMPALA_HOME] = impala_home
+  if hint is not None:
+    params[VAR_HINT] = hint
+  if part_predicate is not None:
+    params[VAR_PART_PREDICATE] = part_predicate
+  if file_format is not None:
+    params[VAR_FILE_FORMAT] = file_format
+  if scale_factor is not None:
+    params[VAR_SCALE_FACTOR] = scale_factor
+  return params
+
+
 def build_create_statement(table_template, table_name, db_name, db_suffix,
-                           file_format, compression, hdfs_location,
-                           force_reload, is_hive_stmt):
+                           file_format, hdfs_location, force_reload, 
is_hive_stmt,
+                           scale_factor=""):
   create_stmt = ''
   if (force_reload):
     tbl_type = 'TABLE'
@@ -247,12 +287,14 @@ def build_create_statement(table_template, table_name, 
db_name, db_suffix,
     # Remove location part from the format string
     table_template = table_template.replace("LOCATION '{hdfs_location}'", "")
 
-  stmt = table_template.format(
-    db_name=db_name,
-    db_suffix=db_suffix,
-    table_name=table_name,
+  params = build_replacement_params(
+    db_name,
+    db_suffix,
+    table_name,
     file_format=FILE_FORMAT_TO_STORED_AS_MAP[file_format],
-    hdfs_location=hdfs_location)
+    hdfs_location=hdfs_location,
+    scale_factor=scale_factor)
+  stmt = table_template.format(**params)
   # Apache Hive 3.1 doesn't support "STORED BY ICEBERG STORED AS AVRO" and
   # "STORED AS JSONFILE" (HIVE-25162, HIVE-19899)
   if is_hive_stmt and os.environ['USE_APACHE_HIVE'] == "true":
@@ -299,9 +341,9 @@ def parse_table_properties(file_format, table_properties):
   tblproperties = {}
   TABLE_PROPERTY_RE = re.compile(
       # Optional '<data-format>:' prefix, capturing just the 'data-format' 
part.
-      r'(?:(\w+):)?' +
+      r'(?:(\w+):)?'
       # Required key=value, capturing the key and value
-      r'(.+?)=(.*)')
+      + r'(.+?)=(.*)')
   for table_property in [_f for _f in table_properties.split("\n") if _f]:
     m = TABLE_PROPERTY_RE.match(table_property)
     if not m:
@@ -408,13 +450,14 @@ LOCATION '{{hdfs_location}}'
     partitioned_by=partitioned_by,
     tblproperties=tblproperties_clause,
     file_format_string=file_format_string
-    ).strip()
+  ).strip()
   # Remove empty lines from the stmt string.  There is an empty line for
   # each of the sections that didn't have anything (e.g. partitioned_by)
   stmt = os.linesep.join([s for s in stmt.splitlines() if s])
   stmt += ';'
   return stmt
 
+
 def build_hbase_create_stmt_in_hive(columns, partition_columns, table_name):
   # The hbase create statement differs sufficiently from the generic create to 
justify a
   # separate method. Specifically, STORED AS becomes STORED BY. There is 
section called
@@ -439,15 +482,16 @@ def build_hbase_create_stmt_in_hive(columns, 
partition_columns, table_name):
     columns=',\n'.join(columns),
     hbase_column_mapping=hbase_column_mapping,
     tbl_properties=tbl_properties,
-    ).strip()
+  ).strip()
   return stmt + ';'
 
+
 def avro_schema(columns):
   record = {
-    "name": "a", # doesn't matter
+    "name": "a",  # doesn't matter
     "type": "record",
     "fields": list()
-    }
+  }
   for column_spec in columns.strip().split('\n'):
     # column_spec looks something like "col_name col_type COMMENT comment"
     # (comment may be omitted, we don't use it)
@@ -460,7 +504,7 @@ def avro_schema(columns):
         precision = 9
       else:
         # Parse out scale and precision from decimal type
-        m = re.search("DECIMAL\((?P<precision>.*),(?P<scale>.*)\)", 
column_spec.upper())
+        m = re.search(r"DECIMAL\((?P<precision>.*),(?P<scale>.*)\)", 
column_spec.upper())
         assert m, "Could not parse decimal column spec: " + column_spec
         scale = int(m.group('scale'))
         precision = int(m.group('precision'))
@@ -477,10 +521,11 @@ def avro_schema(columns):
 
     record["fields"].append(
       {'name': name,
-       'type': [type, "null"]}) # all columns nullable
+       'type': [type, "null"]})  # all columns nullable
 
   return json.dumps(record)
 
+
 def build_compression_codec_statement(codec, compression_type, file_format):
   codec = (AVRO_COMPRESSION_MAP if file_format == 'avro' else 
COMPRESSION_MAP).get(codec)
   if not codec:
@@ -488,6 +533,7 @@ def build_compression_codec_statement(codec, 
compression_type, file_format):
   return (AVRO_COMPRESSION_CODEC % codec) if file_format == 'avro' else (
     COMPRESSION_TYPE % compression_type.upper() + '\n' + COMPRESSION_CODEC % 
codec)
 
+
 def build_codec_enabled_statement(codec):
   compression_enabled = 'false' if codec == 'none' else 'true'
   return COMPRESSION_ENABLED % compression_enabled
@@ -505,25 +551,25 @@ def build_partitioned_load(insert, re_match, can_hint, 
params):
   batch = int(re_match.group(4))
   insert = PARTITIONED_INSERT_RE.sub("", insert)
   statements = []
-  params["hint"] = HINT_SHUFFLE if can_hint else ''
+  params[VAR_HINT] = HINT_SHUFFLE if can_hint else ''
   first_part = min_val
   while first_part < max_val:
     if first_part + batch >= max_val:
       # This is the last batch
       if first_part == min_val:
         # There is only 1 batch in this insert. No predicate needed.
-        params["part_predicate"] = ''
+        params[VAR_PART_PREDICATE] = ''
       else:
         # Insert the remaining partitions + NULL partition
-        params["part_predicate"] = "WHERE {0} <= {1} OR {2} IS NULL".format(
+        params[VAR_PART_PREDICATE] = "WHERE {0} <= {1} OR {2} IS NULL".format(
           first_part, part_col, part_col)
     elif first_part == min_val:
       # This is the first batch.
-      params["part_predicate"] = "WHERE {0} < {1}".format(
+      params[VAR_PART_PREDICATE] = "WHERE {0} < {1}".format(
         part_col, first_part + batch)
     else:
       # This is the middle batch.
-      params["part_predicate"] = "WHERE {0} <= {1} AND {2} < {3}".format(
+      params[VAR_PART_PREDICATE] = "WHERE {0} <= {1} AND {2} < {3}".format(
         first_part, part_col, part_col, first_part + batch)
     statements.append(insert.format(**params))
     first_part += batch
@@ -531,24 +577,24 @@ def build_partitioned_load(insert, re_match, can_hint, 
params):
 
 
 def build_insert_into_statement(insert, db_name, db_suffix, table_name, 
file_format,
-                                hdfs_path, for_impala=False):
+                                hdfs_path, for_impala=False, scale_factor=""):
   can_hint = for_impala and (file_format == 'parquet' or 
is_iceberg_table(file_format))
-  params = {
-    "db_name": db_name,
-    "db_suffix": db_suffix,
-    "table_name": table_name,
-    "hdfs_location": hdfs_path,
-    "impala_home": os.getenv("IMPALA_HOME"),
-    "hint": "",
-    "part_predicate": ""
-  }
+  params = build_replacement_params(
+    db_name,
+    db_suffix,
+    table_name,
+    hdfs_location=hdfs_path,
+    impala_home=IMPALA_HOME,
+    hint="",
+    part_predicate="",
+    scale_factor=scale_factor)
 
   m = PARTITIONED_INSERT_RE.search(insert)
   if m:
     insert_statement = build_partitioned_load(insert, m, can_hint, params)
   else:
     if can_hint:
-      params["hint"] = HINT_SHUFFLE
+      params[VAR_HINT] = HINT_SHUFFLE
     insert_statement = insert.format(**params)
 
   # Kudu tables are managed and don't support OVERWRITE, so we replace 
OVERWRITE
@@ -573,20 +619,22 @@ def build_insert_into_statement(insert, db_name, 
db_suffix, table_name, file_for
     statement += SET_HIVE_INPUT_FORMAT % "HiveInputFormat"
   return statement + insert_statement
 
+
 def build_hbase_insert(db_name, db_suffix, table_name):
   hbase_insert = SET_HIVE_HBASE_BULK_LOAD + ';\n'
   # For Apache Hive, "hive.hbase.bulk does not exist" exception will be thrown 
and there
   # is only warning in cdp
   if os.environ['USE_APACHE_HIVE'] == "true":
     hbase_insert = ""
+  params = build_replacement_params(db_name, db_suffix, table_name)
   hbase_insert += ("INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}"
-                   " SELECT * FROM {db_name}.{table_name};\n").\
-                   format(db_name=db_name, 
db_suffix=db_suffix,table_name=table_name)
+                   " SELECT * FROM {db_name}.{table_name};\n").format(**params)
   return hbase_insert
 
+
 def build_insert(insert, db_name, db_suffix, file_format,
                  codec, compression_type, table_name, hdfs_path, 
create_hive=False,
-                 for_impala=False):
+                 for_impala=False, scale_factor=""):
   # HBASE inserts don't need the hive options to be set, and don't require and 
HDFS
   # file location, so they're handled separately.
   if file_format == 'hbase' and not create_hive:
@@ -602,26 +650,25 @@ def build_insert(insert, db_name, db_suffix, file_format,
     output += build_impala_parquet_codec_statement(codec) + "\n"
   output += build_insert_into_statement(insert, db_name, db_suffix,
                                         table_name, file_format, hdfs_path,
-                                        for_impala) + "\n"
+                                        for_impala, scale_factor) + "\n"
   return output
 
 
-def build_load_statement(load_template, db_name, db_suffix, table_name):
+def build_load_statement(load_template, db_name, db_suffix, table_name, 
scale_factor=""):
   # hbase does not need the hdfs path.
   if table_name.startswith('hbase'):
-    load_template = load_template.format(table_name=table_name,
-                                         db_name=db_name,
-                                         db_suffix=db_suffix)
+    params = build_replacement_params(db_name, db_suffix, table_name)
+    load_template = load_template.format(**params)
   else:
-    base_load_dir = os.getenv("REMOTE_LOAD", os.getenv("IMPALA_HOME"))
-    params = {
-      "db_name": db_name,
-      "db_suffix": db_suffix,
-      "table_name": table_name,
-      "impala_home": base_load_dir,
-      "hint": "",
-      "part_predicate": ""
-    }
+    base_load_dir = os.getenv("REMOTE_LOAD", IMPALA_HOME)
+    params = build_replacement_params(
+      db_name,
+      db_suffix,
+      table_name,
+      impala_home=base_load_dir,
+      hint="",
+      part_predicate="",
+      scale_factor=scale_factor)
 
     m = PARTITIONED_INSERT_RE.search(load_template)
     if m:
@@ -646,11 +693,12 @@ def build_hbase_create_stmt(db_name, table_name, 
column_families, region_splits)
   create_stmts.append(create_statement)
   return create_stmts
 
+
 # Does a hdfs directory listing and returns array with all the subdir names.
 def get_hdfs_subdirs_with_data(path):
   tmp_file = tempfile.TemporaryFile("w+")
   cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path
-  subprocess.call([cmd], shell = True, stderr = open('/dev/null'), stdout = 
tmp_file)
+  subprocess.call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file)
   tmp_file.seek(0)
 
   # Results look like:
@@ -658,6 +706,7 @@ def get_hdfs_subdirs_with_data(path):
   # So to get subdirectory names just return everything after the last '/'
   return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()]
 
+
 class Statements(object):
   """Simple container object for storing SQL statements to be output to a
   file. Useful for ordering the statements correctly."""
@@ -676,6 +725,7 @@ class Statements(object):
   def __bool__(self):
     return bool(self.create or self.load or self.load_base)
 
+
 def eval_section(section_str):
   """section_str should be the contents of a section (i.e. a string). If 
section_str
   starts with `, evaluates section_str as a shell command and returns the
@@ -690,6 +740,7 @@ def eval_section(section_str):
   assert p.returncode == 0
   return stdout.strip()
 
+
 def generate_statements(output_name, test_vectors, sections,
                         schema_include_constraints, schema_exclude_constraints,
                         schema_only_constraints, convert_orc_to_full_acid):
@@ -801,10 +852,10 @@ def generate_statements(output_name, test_vectors, 
sections,
 
       tblproperties = parse_table_properties(create_file_format, 
table_properties)
       # ORC tables are full ACID by default.
-      if (convert_orc_to_full_acid and
-          HIVE_MAJOR_VERSION == 3 and
-          create_file_format == 'orc' and
-          'transactional' not in tblproperties):
+      if (convert_orc_to_full_acid
+          and HIVE_MAJOR_VERSION == 3
+          and create_file_format == 'orc'
+          and 'transactional' not in tblproperties):
         tblproperties['transactional'] = 'true'
       if create_file_format == 'orc' and create_codec != 'def':
         # The default value of 'orc.compress' is ZLIB which corresponds to 
'def'.
@@ -861,15 +912,15 @@ def generate_statements(output_name, test_vectors, 
sections,
         if file_format == 'avro':
           if not os.path.exists(avro_schema_dir):
             os.makedirs(avro_schema_dir)
-          with open("%s/%s.json" % (avro_schema_dir, table_name),"w") as f:
+          with open("%s/%s.json" % (avro_schema_dir, table_name), "w") as f:
             f.write(avro_schema(columns))
       else:
         table_template = None
 
       if table_template:
         output.create.append(build_create_statement(table_template, 
table_name, db_name,
-            db_suffix, create_file_format, create_codec, data_path, 
force_reload,
-            create_hive))
+            db_suffix, create_file_format, data_path, force_reload, 
create_hive,
+            options.scale_factor))
       # HBASE create table
       if file_format == 'hbase':
         # If the HBASE_COLUMN_FAMILIES section does not exist, default to 'd'
@@ -907,18 +958,20 @@ def generate_statements(output_name, test_vectors, 
sections,
         load_from_json_file = file_format == 'json' and 
table_name.endswith('_json')
         if not db_suffix or load_from_json_file:
           if load:
-            hive_output.load_base.append(build_load_statement(load, db_name,
-                                                              db_suffix, 
table_name))
+            hive_output.load_base.append(
+                build_load_statement(load, db_name, db_suffix, table_name,
+                                     options.scale_factor))
           else:
             print('Empty base table load for %s. Skipping load generation' % 
table_name)
         elif file_format in ['kudu', 'parquet', 'iceberg']:
           if insert_hive:
             hive_output.load.append(build_insert(insert_hive, db_name, 
db_suffix,
-                file_format, codec, compression_type, table_name, data_path))
+                file_format, codec, compression_type, table_name, data_path,
+                for_impala=False, scale_factor=options.scale_factor))
           elif insert:
             impala_load.load.append(build_insert(insert, db_name, db_suffix,
                 file_format, codec, compression_type, table_name, data_path,
-                for_impala=True))
+                for_impala=True, scale_factor=options.scale_factor))
           else:
             print('Empty parquet/kudu load for table %s. Skipping insert 
generation'
               % table_name)
@@ -961,6 +1014,7 @@ def parse_schema_template_file(file_name):
                          'TABLE_PROPERTIES', 'HBASE_REGION_SPLITS', 
'HIVE_MAJOR_VERSION']
   return parse_test_file(file_name, VALID_SECTION_NAMES, 
skip_unknown_sections=False)
 
+
 if __name__ == "__main__":
   if options.table_formats is None:
     if options.exploration_strategy not in KNOWN_EXPLORATION_STRATEGIES:
@@ -968,7 +1022,7 @@ if __name__ == "__main__":
       print('Valid values:', ', '.join(KNOWN_EXPLORATION_STRATEGIES))
       sys.exit(1)
 
-    test_vectors = [vector.value for vector in\
+    test_vectors = [vector.value for vector in
         load_table_info_dimension(options.workload, 
options.exploration_strategy)]
   else:
     table_formats = options.table_formats.split(',')
diff --git 
a/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql 
b/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql
index a7ceecda1..6c6b436ff 100644
--- a/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql
+++ b/testdata/datasets/tpcds_partitioned/tpcds_partitioned_schema_template.sql
@@ -35,7 +35,7 @@ t_meal_time               string
 primary key (t_time_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -73,7 +73,7 @@ d_current_year            string
 primary key (d_date_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -86,7 +86,7 @@ r_reason_desc         string
 primary key (r_reason_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -109,7 +109,7 @@ ca_location_type          string
 primary key (ca_address_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -122,7 +122,7 @@ ib_upper_bound            int
 primary key (ib_income_band_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -138,7 +138,7 @@ primary key (hd_demo_sk) DISABLE NOVALIDATE RELY
 foreign key (hd_income_band_sk) references {db_name}{db_suffix}.income_band 
(ib_income_band_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -157,7 +157,7 @@ cd_dep_college_count      int
 primary key (cd_demo_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -173,7 +173,7 @@ sm_contract               string
 primary key (sm_ship_mode_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -252,7 +252,7 @@ SELECT
   i_container,
   i_manager_id,
   i_product_name
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -276,7 +276,7 @@ w_gmt_offset              decimal(5,2)
 primary key (w_warehouse_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -319,7 +319,7 @@ foreign key (cc_closed_date_sk) references 
{db_name}{db_suffix}.date_dim (d_date
 foreign key (cc_open_date_sk) references {db_name}{db_suffix}.date_dim 
(d_date_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -392,7 +392,7 @@ SELECT
   c_login,
   c_email_address,
   CAST(c_last_review_date AS INT)
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -465,7 +465,7 @@ SELECT
   p_channel_details,
   p_purpose,
   p_discount_active
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -505,7 +505,7 @@ primary key (s_store_sk) DISABLE NOVALIDATE RELY
 foreign key (s_closed_date_sk) references {db_name}{db_suffix}.date_dim 
(d_date_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -526,7 +526,7 @@ foreign key (cp_start_date_sk) references 
{db_name}{db_suffix}.date_dim (d_date_
 foreign key (cp_end_date_sk) references {db_name}{db_suffix}.date_dim 
(d_date_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -553,7 +553,7 @@ foreign key (wp_access_date_sk) references 
{db_name}{db_suffix}.date_dim (d_date
 foreign key (wp_customer_sk) references {db_name}{db_suffix}.customer 
(c_customer_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -591,7 +591,7 @@ foreign key (web_open_date_sk) references 
{db_name}{db_suffix}.date_dim (d_date_
 foreign key (web_close_date_sk) references {db_name}{db_suffix}.date_dim 
(d_date_sk) DISABLE NOVALIDATE RELY
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
-SELECT * FROM tpcds.{table_name};
+SELECT * FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -629,7 +629,7 @@ SELECT
   inv_warehouse_sk,
   inv_quantity_on_hand,
   inv_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -762,7 +762,7 @@ SELECT
   cs_net_paid_inc_ship_tax,
   cs_net_profit,
   cs_sold_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -874,7 +874,7 @@ SELECT
   cr_store_credit,
   cr_net_loss,
   cr_returned_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -966,7 +966,7 @@ SELECT
   ss_net_paid_inc_tax,
   ss_net_profit,
   ss_sold_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -1050,7 +1050,7 @@ SELECT
   sr_store_credit,
   sr_net_loss,
   sr_returned_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -1183,7 +1183,7 @@ SELECT
   ws_net_paid_inc_ship_tax,
   ws_net_profit,
   ws_sold_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
 ---- DATASET
 tpcds_partitioned
@@ -1283,5 +1283,5 @@ SELECT
   wr_account_credit,
   wr_net_loss,
   wr_returned_date_sk
-FROM tpcds.{table_name};
+FROM tpcds{scale_factor}.{table_name};
 ====
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 754854f49..d0136a9cf 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -394,7 +394,7 @@ def load_tpc_queries(workload, 
include_stress_queries=False, query_name_filters=
   # find. Both workload directories contain other queries that are not part of 
the TPC
   # spec.
   file_workload = workload
-  if workload == "tpcds":
+  if workload in ["tpcds", "tpcds_partitioned"]:
     # TPCDS is assumed to always use decimal_v2, which is the default since 3.0
     file_workload = "tpcds-decimal_v2"
   if include_stress_queries:
@@ -402,7 +402,11 @@ def load_tpc_queries(workload, 
include_stress_queries=False, query_name_filters=
   else:
     file_name_pattern = re.compile(r"^{0}-(q.*).test$".format(file_workload))
 
-  query_name_pattern = re.compile(r"^{0}-(.*)$".format(workload.upper()))
+  query_name_prefix = workload.upper()
+  if workload == "tpcds_partitioned":
+    # 'tpcds_partitioned' symlink to the same queries set of 'tpcds'.
+    query_name_prefix = "TPCDS"
+  query_name_pattern = re.compile(r"^{0}-(.*)$".format(query_name_prefix))
   if workload == "tpch_nested":
     query_name_pattern = re.compile(r"^TPCH-(.*)$")
 
@@ -426,6 +430,9 @@ def load_tpc_queries(workload, 
include_stress_queries=False, query_name_filters=
     test_cases = parse_query_test_file(file_path)
     for test_case in test_cases:
       query_sql = remove_comments(test_case["QUERY"])
+      if workload == "tpcds_partitioned":
+        # replace old columns names from old TPC-DS spec with a new one.
+        query_sql = query_sql.replace("c_last_review_date", 
"c_last_review_date_sk")
 
       if re.match(filter_regex, test_case["QUERY_NAME"]):
         query_name_match = query_name_pattern.search(test_case["QUERY_NAME"])


Reply via email to