This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 166b39547 IMPALA-14553: Run schema eval concurrently
166b39547 is described below

commit 166b39547e033956e3f5c941cb36165c59a18275
Author: Michael Smith <[email protected]>
AuthorDate: Mon Nov 3 16:46:05 2025 -0800

    IMPALA-14553: Run schema eval concurrently
    
    The majority of time spent in generate-schema-statements.py is in
    eval_section for schema operations that shell out, often uploading files
    via the hadoop CLI or generating data files. These operations should be
    independent.
    
    Runs eval_section at the beginning so we don't repeat it for each row in
    test_vectors, and executes them in parallel via a ThreadPool. Defaults
    to NUM_CONCURRENT_TESTS threads because the underlying operations have
    some concurrency to them (such as HDFS mirroring writes).
    
    Also collects existing tables into a set to optimize lookup.
    
    Reduces generate-schema-statements by ~60%, from 2m30s to 1m. Confirmed
    that contents of logs/data_loading/sql/functional are identical.
    
    Change-Id: I2a78d05fd6a0005c83561978713237da2dde6af2
    Reviewed-on: http://gerrit.cloudera.org:8080/23627
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Michael Smith <[email protected]>
---
 testdata/bin/generate-schema-statements.py | 185 +++++++++++++++++++++--------
 1 file changed, 136 insertions(+), 49 deletions(-)

diff --git a/testdata/bin/generate-schema-statements.py 
b/testdata/bin/generate-schema-statements.py
index 229c49891..84e550d01 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -104,6 +104,8 @@ import shutil
 import subprocess
 import sys
 import tempfile
+import time
+from multiprocessing.pool import AsyncResult, ThreadPool
 from optparse import OptionParser
 from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.util.test_file_parser import parse_table_constraints, 
parse_test_file
@@ -136,6 +138,9 @@ parser.add_option("--table_formats", dest="table_formats", 
default=None,
                   "formats. Ex. --table_formats=seq/snap/block,text/none")
 parser.add_option("--hdfs_namenode", dest="hdfs_namenode", 
default="localhost:20500",
                   help="HDFS name node for Avro schema URLs, default 
localhost:20500")
+parser.add_option("--num_processes", type="int", dest="num_processes",
+                  default=os.environ['NUM_CONCURRENT_TESTS'],
+                  help="Number of parallel processes to use.")
 (options, args) = parser.parse_args()
 
 if options.workload is None:
@@ -699,13 +704,13 @@ def build_hbase_create_stmt(db_name, table_name, 
column_families, region_splits)
 def get_hdfs_subdirs_with_data(path):
   tmp_file = tempfile.TemporaryFile("w+")
   cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path
-  subprocess.call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file)
+  subprocess.check_call([cmd], shell=True, stderr=open('/dev/null'), 
stdout=tmp_file)
   tmp_file.seek(0)
 
   # Results look like:
   # <acls> -  <user> <group> <date> /directory/subdirectory
   # So to get subdirectory names just return everything after the last '/'
-  return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()]
+  return set([line[line.rfind('/') + 1:].strip() for line in 
tmp_file.readlines()])
 
 
 class Statements(object):
@@ -727,20 +732,108 @@ class Statements(object):
     return bool(self.create or self.load or self.load_base)
 
 
-def eval_section(section_str):
+def eval_section(pool, section_str):
   """section_str should be the contents of a section (i.e. a string). If 
section_str
-  starts with `, evaluates section_str as a shell command and returns the
-  output. Otherwise returns section_str."""
+  starts with `, evaluates section_str as a shell command in pool and returns 
an
+  AsyncResult to produce the output. Otherwise returns section_str. Results of 
this
+  function should be passed to unwrap() to get the actual value."""
   if not section_str.startswith('`'): return section_str
   cmd = section_str[1:]
   # Use bash explicitly instead of setting shell=True so we get more advanced 
shell
   # features (e.g. "for i in {1..n}")
-  p = subprocess.Popen(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE,
-      universal_newlines=True)
-  stdout, stderr = p.communicate()
-  if stderr: print(stderr)
-  assert p.returncode == 0
-  return stdout.strip()
+  return pool.apply_async(subprocess.check_output,
+      (['/bin/bash', '-c', cmd],), {'universal_newlines': True})
+
+
+def unwrap(result):
+  """If result is an AsyncResult, get the actual value from it. Otherwise 
return
+  result."""
+  if type(result) is AsyncResult:
+    # If the command produced no output, return a newline so this section is 
still
+    # treated as having been set.
+    return result.get() or '\n'
+  return result
+
+
+def eval_sections(test_vectors, sections, fails_only_constraint,
+                  fails_include_constraint, fails_exclude_constraint):
+  """Evaluates all sections that are shell commands in parallel using a thread 
pool.
+  Returns a new list of sections with all shell commands evaluated."""
+  new_sections = list()
+  table_names = None
+  if options.table_names:
+    table_names = [name.lower() for name in options.table_names.split(',')]
+  file_formats = [row.file_format for row in test_vectors]
+  table_formats = 
[f"{row.file_format}/{row.compression_codec}/{row.compression_type}"
+                   for row in test_vectors]
+
+  # Sections are re-used for multiple test vectors, but eval_section is only 
needed once.
+  # Use a threadpool to execute eval_section in parallel as they shell out.
+  pool = ThreadPool(processes=options.num_processes)
+  for section in sections:
+    table_name = section['BASE_TABLE_NAME'].strip().lower()
+
+    if table_names and (table_name.lower() not in table_names):
+      print(f"Skipping table '{table_name}': table is not in specified table 
list")
+      continue
+
+    # Check Hive version requirement, if present.
+    if section['HIVE_MAJOR_VERSION'] and \
+        section['HIVE_MAJOR_VERSION'].strip() != \
+        os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
+      print(f"Skipping table '{table_name}': wrong Hive major version")
+      continue
+
+    if all([fails_only_constraint(table_name, format) for format in 
table_formats]):
+      print(f"Skipping table '{table_name}': 'only' constraint for formats did 
not "
+            "include this table.")
+      continue
+
+    if all([fails_include_constraint(table_name, format) for format in 
table_formats]):
+      print(f"Skipping '{table_name}' due to include constraint matches.")
+      continue
+
+    if all([fails_exclude_constraint(table_name, format) for format in 
table_formats]):
+      print(f"Skipping '{table_name}' due to exclude constraint matches.")
+      continue
+
+    assert not (section['CREATE'] and section['CREATE_HIVE']), \
+        "Can't set both CREATE and CREATE_HIVE"
+
+    assert not (section['DEPENDENT_LOAD'] and section['DEPENDENT_LOAD_HIVE']), 
\
+        "Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
+
+    section['LOAD'] = eval_section(pool, section['LOAD'])
+
+    section['DEPENDENT_LOAD'] = eval_section(pool, section['DEPENDENT_LOAD'])
+    section['DEPENDENT_LOAD_HIVE'] = eval_section(pool, 
section['DEPENDENT_LOAD_HIVE'])
+
+    if 'kudu' in file_formats and section['DEPENDENT_LOAD_KUDU']:
+      section['DEPENDENT_LOAD_KUDU'] = eval_section(pool, 
section['DEPENDENT_LOAD_KUDU'])
+
+    if 'orc' in file_formats and section["DEPENDENT_LOAD_ACID"]:
+      section["DEPENDENT_LOAD_ACID"] = eval_section(pool, 
section["DEPENDENT_LOAD_ACID"])
+
+    if 'json' in file_formats and section["DEPENDENT_LOAD_JSON"]:
+      section["DEPENDENT_LOAD_JSON"] = eval_section(pool, 
section["DEPENDENT_LOAD_JSON"])
+
+    section['COLUMNS'] = eval_section(pool, section['COLUMNS'])
+    new_sections.append(section)
+
+  # Close the pool to new tasks and collect the results.
+  pool.close()
+  pool.join()
+  for section in new_sections:
+    # Ensure all async commands are done.
+    section['LOAD'] = unwrap(section['LOAD'])
+    section['DEPENDENT_LOAD'] = unwrap(section['DEPENDENT_LOAD'])
+    section['DEPENDENT_LOAD_HIVE'] = unwrap(section['DEPENDENT_LOAD_HIVE'])
+    section['DEPENDENT_LOAD_KUDU'] = unwrap(section['DEPENDENT_LOAD_KUDU'])
+    section["DEPENDENT_LOAD_ACID"] = unwrap(section["DEPENDENT_LOAD_ACID"])
+    section["DEPENDENT_LOAD_JSON"] = unwrap(section["DEPENDENT_LOAD_JSON"])
+    section['COLUMNS'] = unwrap(section['COLUMNS'])
+
+  return new_sections
 
 
 def generate_statements(output_name, test_vectors, sections,
@@ -748,14 +841,27 @@ def generate_statements(output_name, test_vectors, 
sections,
                         schema_only_constraints, convert_orc_to_full_acid):
   # TODO: This method has become very unwieldy. It has to be re-factored 
sooner than
   # later.
-  # Parquet statements to be executed separately by Impala
+  def fails_only_constraint(table_name, table_format):
+    constraint = schema_only_constraints.get(table_format)
+    return constraint is not None and table_name not in constraint
+
+  def fails_include_constraint(table_name, table_format):
+    constraint = schema_include_constraints.get(table_name)
+    return constraint is not None and table_format not in constraint
+
+  def fails_exclude_constraint(table_name, table_format):
+    constraint = schema_exclude_constraints.get(table_name)
+    return constraint is not None and table_format in constraint
+
+  start = time.time()
+  sections = eval_sections(test_vectors, sections, fails_only_constraint,
+                           fails_include_constraint, fails_exclude_constraint)
+  print(f"Evaluating sections took {time.time() - start:.3f} seconds")
+
   hbase_output = Statements()
   hbase_post_load = Statements()
   impala_invalidate = Statements()
 
-  table_names = None
-  if options.table_names:
-    table_names = [name.lower() for name in options.table_names.split(',')]
   existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
   for row in test_vectors:
     impala_create = Statements()
@@ -772,62 +878,43 @@ def generate_statements(output_name, test_vectors, 
sections,
     for section in sections:
       table_name = section['BASE_TABLE_NAME'].strip()
 
-      if table_names and (table_name.lower() not in table_names):
-        print('Skipping table: %s.%s, table is not in specified table list' %
-              (db, table_name))
+      if fails_only_constraint(table_name.lower(), table_format):
+        print(f"Skipping table: {db}.{table_name}, 'only' constraint for 
format did not "
+              "include this table.")
         continue
 
-      # Check Hive version requirement, if present.
-      if section['HIVE_MAJOR_VERSION'] and \
-         section['HIVE_MAJOR_VERSION'].strip() != \
-         os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
-        print("Skipping table '{0}.{1}': wrong Hive major version".format(db, 
table_name))
+      if fails_include_constraint(table_name.lower(), table_format):
+        print(f"Skipping '{db}.{table_name}' due to include constraint match.")
         continue
 
-      if table_format in schema_only_constraints and \
-         table_name.lower() not in schema_only_constraints[table_format]:
-        print(('Skipping table: %s.%s, \'only\' constraint for format did not '
-              'include this table.') % (db, table_name))
+      if fails_exclude_constraint(table_name.lower(), table_format):
+        print(f"Skipping '{db}.{table_name}' due to exclude constraint match.")
         continue
 
-      if schema_include_constraints[table_name.lower()] and \
-         table_format not in schema_include_constraints[table_name.lower()]:
-        print('Skipping \'%s.%s\' due to include constraint match.' % (db, 
table_name))
-        continue
-
-      if schema_exclude_constraints[table_name.lower()] and\
-         table_format in schema_exclude_constraints[table_name.lower()]:
-        print('Skipping \'%s.%s\' due to exclude constraint match.' % (db, 
table_name))
-        continue
-
-      alter = section.get('ALTER')
+      alter = section['ALTER']
       create = section['CREATE']
       create_hive = section['CREATE_HIVE']
-      assert not (create and create_hive), "Can't set both CREATE and 
CREATE_HIVE"
-
       table_properties = section['TABLE_PROPERTIES']
-      insert = eval_section(section['DEPENDENT_LOAD'])
-      insert_hive = eval_section(section['DEPENDENT_LOAD_HIVE'])
-      assert not (insert and insert_hive),\
-          "Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
-      load = eval_section(section['LOAD'])
+      insert = section['DEPENDENT_LOAD']
+      insert_hive = section['DEPENDENT_LOAD_HIVE']
+      load = section['LOAD']
 
       if file_format == 'kudu':
         create_kudu = section["CREATE_KUDU"]
         if section['DEPENDENT_LOAD_KUDU']:
-          insert = eval_section(section['DEPENDENT_LOAD_KUDU'])
+          insert = section['DEPENDENT_LOAD_KUDU']
       else:
         create_kudu = None
 
       if file_format == 'orc' and section["DEPENDENT_LOAD_ACID"]:
         insert = None
-        insert_hive = eval_section(section["DEPENDENT_LOAD_ACID"])
+        insert_hive = section["DEPENDENT_LOAD_ACID"]
 
       if file_format == 'json' and section["DEPENDENT_LOAD_JSON"]:
         insert = None
-        insert_hive = eval_section(section["DEPENDENT_LOAD_JSON"])
+        insert_hive = section["DEPENDENT_LOAD_JSON"]
 
-      columns = eval_section(section['COLUMNS']).strip()
+      columns = section['COLUMNS'].strip()
       partition_columns = section['PARTITION_COLUMNS'].strip()
       row_format = section['ROW_FORMAT'].strip()
       table_comment = section['COMMENT'].strip()

Reply via email to