This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ae4ff47fe7547863df06e5e224b87065a4eaf110
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Thu Oct 3 16:58:58 2024 +0200

    IMPALA-12967, IMPALA-13059, IMPALA-13144, IMPALA-13195: 
test_migrated_table_field_id_resolution fails in exhaustive mode
    
    When executed in exhaustive mode, multiple instances of
    test_migrated_table_field_id_resolution is running in parallel,
    reading and writing the same files which can lead to various
    errors, hence the multiple Jira tickets in the title.
    
    Building upon rewrite-iceberg-metadata.py, with this patch
    the different test instances load the tables under different
    directories (corresponding to the unique_database).
    
    Change-Id: Id41a78940a5da5344735974e1d2c94ed4f24539a
    Reviewed-on: http://gerrit.cloudera.org:8080/21882
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 testdata/bin/load-test-warehouse-snapshot.sh       |   5 +-
 testdata/bin/rewrite-iceberg-metadata.py           | 109 +---------------
 tests/common/file_utils.py                         |  26 ++--
 .../util/iceberg_metadata_util.py                  | 144 ++++++++++++---------
 4 files changed, 101 insertions(+), 183 deletions(-)

diff --git a/testdata/bin/load-test-warehouse-snapshot.sh 
b/testdata/bin/load-test-warehouse-snapshot.sh
index ff73d11eb..480eaa4b7 100755
--- a/testdata/bin/load-test-warehouse-snapshot.sh
+++ b/testdata/bin/load-test-warehouse-snapshot.sh
@@ -117,8 +117,9 @@ if [ "${TARGET_FILESYSTEM}" != "hdfs" ]; then
   # Need to rewrite test metadata regardless of ${WAREHOUSE_LOCATION_PREFIX} 
because
   # paths can have "hdfs://" scheme
   echo "Updating Iceberg locations with warehouse prefix 
${WAREHOUSE_LOCATION_PREFIX}"
-  ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py 
"${WAREHOUSE_LOCATION_PREFIX}" \
-      $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata")
+  PYTHONPATH=${IMPALA_HOME} 
${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py \
+    "${WAREHOUSE_LOCATION_PREFIX}" \
+    $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata")
 fi
 
 echo "Copying data to ${TARGET_FILESYSTEM}"
diff --git a/testdata/bin/rewrite-iceberg-metadata.py 
b/testdata/bin/rewrite-iceberg-metadata.py
index d0c4d40dc..a435b5993 100755
--- a/testdata/bin/rewrite-iceberg-metadata.py
+++ b/testdata/bin/rewrite-iceberg-metadata.py
@@ -18,14 +18,9 @@
 # under the License.
 
 from __future__ import absolute_import, division, print_function
-from builtins import map
-import glob
-import json
-import os
 import sys
+from tests.util.iceberg_metadata_util import rewrite_metadata
 
-from avro.datafile import DataFileReader, DataFileWriter
-from avro.io import DatumReader, DatumWriter
 
 args = sys.argv[1:]
 if len(args) < 2:
@@ -34,103 +29,5 @@ if len(args) < 2:
 
 prefix = args[0]
 
-# Easier to cache it instead of trying to resolve the manifest files paths
-file_size_cache = {}
-
-
-def generate_new_path(prefix, file_path):
-  """ Hive generates metadata with absolute paths.
-  This method relativizes the path and applies a new prefix."""
-  start_directory = "/test-warehouse"
-  start = file_path.find(start_directory)
-  if start == -1:
-    raise RuntimeError("{} is not found in file path:{}".format(
-      start_directory, file_path))
-  return prefix + file_path[start:]
-
-
-def add_prefix_to_snapshot(snapshot):
-  if 'manifest-list' in snapshot:
-    snapshot['manifest-list'] = generate_new_path(prefix, 
snapshot['manifest-list'])
-  if 'manifests' in snapshot:
-    snapshot['manifests'] = [generate_new_path(prefix, m) for m in 
snapshot['manifests']]
-  return snapshot
-
-
-def add_prefix_to_mlog(metadata_log):
-  metadata_log['metadata-file'] = generate_new_path(prefix, 
metadata_log['metadata-file'])
-  return metadata_log
-
-
-def add_prefix_to_snapshot_entry(entry):
-  if 'manifest_path' in entry:
-    entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path'])
-  if 'data_file' in entry:
-    entry['data_file']['file_path'] = generate_new_path(prefix,
-                                      entry['data_file']['file_path'])
-  return entry
-
-
-def fix_manifest_length(entry):
-  if 'manifest_path' in entry and 'manifest_length' in entry:
-    filename = entry['manifest_path'].split('/')[-1]
-    if filename in file_size_cache:
-      entry['manifest_length'] = file_size_cache[filename]
-  return entry
-
-
-for arg in args[1:]:
-  # Update metadata.json
-  for mfile in glob.glob(os.path.join(arg, '*.metadata.json')):
-    with open(mfile, 'r') as f:
-      metadata = json.load(f)
-
-    if 'format-version' not in metadata:
-      print("WARN: skipping {}, missing format-version".format(f))
-      continue
-
-    version = metadata['format-version']
-    if version < 1 or version > 2:
-      print("WARN: skipping {}, unknown version {}".format(f, version))
-      continue
-
-    # metadata: required
-    metadata['location'] = generate_new_path(prefix, metadata['location'])
-
-    # snapshots: optional
-    if 'snapshots' in metadata:
-      metadata['snapshots'] = list(map(add_prefix_to_snapshot, 
metadata['snapshots']))
-
-    # metadata-log: optional
-    if 'metadata-log' in metadata:
-      metadata['metadata-log'] = list(map(add_prefix_to_mlog, 
metadata['metadata-log']))
-
-    with open(mfile + '.tmp', 'w') as f:
-      json.dump(metadata, f, indent=2)
-    os.rename(mfile + '.tmp', mfile)
-
-  for afile in glob.glob(os.path.join(arg, '*.avro')):
-    with open(afile, 'rb') as f:
-      with DataFileReader(f, DatumReader()) as reader:
-        schema = reader.datum_reader.writers_schema
-        lines = list(map(add_prefix_to_snapshot_entry, reader))
-
-    with open(afile + '.tmp', 'wb') as f:
-      with DataFileWriter(f, DatumWriter(), schema) as writer:
-        for line in lines:
-          writer.append(line)
-    os.rename(afile + '.tmp', afile)
-    filename = afile.split('/')[-1]
-    file_size_cache[filename] = os.path.getsize(afile)
-
-  for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')):
-    with open(snapfile, 'rb') as f:
-      with DataFileReader(f, DatumReader()) as reader:
-        schema = reader.datum_reader.writers_schema
-        lines = list(map(fix_manifest_length, reader))
-
-    with open(snapfile + '.tmp', 'wb') as f:
-      with DataFileWriter(f, DatumWriter(), schema) as writer:
-        for line in lines:
-          writer.append(line)
-    os.rename(snapfile + '.tmp', snapfile)
+for metadata_dir in args[1:]:
+  rewrite_metadata(prefix, None, metadata_dir)
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index f3c4e478f..66431dd16 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -27,6 +27,7 @@ import tempfile
 from subprocess import check_call
 
 from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX
+from tests.util.iceberg_metadata_util import rewrite_metadata
 
 
 def create_iceberg_table_from_directory(impala_client, unique_database, 
table_name,
@@ -38,28 +39,29 @@ def create_iceberg_table_from_directory(impala_client, 
unique_database, table_na
   assert file_format == "orc" or file_format == "parquet"
 
   local_dir = os.path.join(
-    os.environ['IMPALA_HOME'], 
'testdata/data/iceberg_test/{0}'.format(table_name))
+    os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name)
   assert os.path.isdir(local_dir)
 
-  # If using a prefix, rewrite iceberg metadata to use the prefix
-  if WAREHOUSE_PREFIX:
-    tmp_dir = tempfile.mktemp(table_name)
-    check_call(['cp', '-r', local_dir, tmp_dir])
-    rewrite = os.path.join(
-        os.environ['IMPALA_HOME'], 'testdata/bin/rewrite-iceberg-metadata.py')
-    check_call([rewrite, WAREHOUSE_PREFIX, os.path.join(tmp_dir, 'metadata')])
-    local_dir = tmp_dir
+  # Rewrite iceberg metadata to use the warehouse prefix and use 
unique_database
+  tmp_dir = tempfile.mktemp(table_name)
+  # Need to create the temp dir so 'cp -r' will copy local dir with its 
original name
+  # under the temp dir. rewrite_metadata() has the assumption that the parent 
directory
+  # of the 'metadata' directory bears the name of the table.
+  check_call(['mkdir', '-p', tmp_dir])
+  check_call(['cp', '-r', local_dir, tmp_dir])
+  local_dir = os.path.join(tmp_dir, table_name)
+  rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 
'metadata'))
 
   # Put the directory in the database's directory (not the table directory)
-  hdfs_parent_dir = get_fs_path("/test-warehouse")
-
+  hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), 
unique_database)
   hdfs_dir = os.path.join(hdfs_parent_dir, table_name)
 
   # Purge existing files if any
   check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir])
 
   # Note: -d skips a staging copy
-  check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_dir])
+  check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_parent_dir])
+  check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir])
 
   # Create external table
   qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
diff --git a/testdata/bin/rewrite-iceberg-metadata.py 
b/tests/util/iceberg_metadata_util.py
old mode 100755
new mode 100644
similarity index 58%
copy from testdata/bin/rewrite-iceberg-metadata.py
copy to tests/util/iceberg_metadata_util.py
index d0c4d40dc..f9935b227
--- a/testdata/bin/rewrite-iceberg-metadata.py
+++ b/tests/util/iceberg_metadata_util.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env impala-python
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -19,69 +17,18 @@
 
 from __future__ import absolute_import, division, print_function
 from builtins import map
+from functools import partial
 import glob
 import json
 import os
-import sys
 
 from avro.datafile import DataFileReader, DataFileWriter
 from avro.io import DatumReader, DatumWriter
 
-args = sys.argv[1:]
-if len(args) < 2:
-  print("Usage: rewrite-iceberg-metadata.py PREFIX METADATA-dirs...")
-  exit(1)
-
-prefix = args[0]
-
-# Easier to cache it instead of trying to resolve the manifest files paths
-file_size_cache = {}
-
-
-def generate_new_path(prefix, file_path):
-  """ Hive generates metadata with absolute paths.
-  This method relativizes the path and applies a new prefix."""
-  start_directory = "/test-warehouse"
-  start = file_path.find(start_directory)
-  if start == -1:
-    raise RuntimeError("{} is not found in file path:{}".format(
-      start_directory, file_path))
-  return prefix + file_path[start:]
-
-
-def add_prefix_to_snapshot(snapshot):
-  if 'manifest-list' in snapshot:
-    snapshot['manifest-list'] = generate_new_path(prefix, 
snapshot['manifest-list'])
-  if 'manifests' in snapshot:
-    snapshot['manifests'] = [generate_new_path(prefix, m) for m in 
snapshot['manifests']]
-  return snapshot
-
 
-def add_prefix_to_mlog(metadata_log):
-  metadata_log['metadata-file'] = generate_new_path(prefix, 
metadata_log['metadata-file'])
-  return metadata_log
-
-
-def add_prefix_to_snapshot_entry(entry):
-  if 'manifest_path' in entry:
-    entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path'])
-  if 'data_file' in entry:
-    entry['data_file']['file_path'] = generate_new_path(prefix,
-                                      entry['data_file']['file_path'])
-  return entry
-
-
-def fix_manifest_length(entry):
-  if 'manifest_path' in entry and 'manifest_length' in entry:
-    filename = entry['manifest_path'].split('/')[-1]
-    if filename in file_size_cache:
-      entry['manifest_length'] = file_size_cache[filename]
-  return entry
-
-
-for arg in args[1:]:
+def rewrite_metadata(prefix, unique_database, metadata_dir):
   # Update metadata.json
-  for mfile in glob.glob(os.path.join(arg, '*.metadata.json')):
+  for mfile in glob.glob(os.path.join(metadata_dir, '*.metadata.json')):
     with open(mfile, 'r') as f:
       metadata = json.load(f)
 
@@ -94,26 +41,39 @@ for arg in args[1:]:
       print("WARN: skipping {}, unknown version {}".format(f, version))
       continue
 
+    metadata_parent_dir = os.path.split(metadata_dir.rstrip("/"))[0]
+    table_name = os.path.basename(metadata_parent_dir)
+
+    table_params = (prefix, unique_database, table_name)
+
     # metadata: required
-    metadata['location'] = generate_new_path(prefix, metadata['location'])
+    metadata['location'] = generate_new_path(
+        table_params, metadata['location'])
 
     # snapshots: optional
     if 'snapshots' in metadata:
-      metadata['snapshots'] = list(map(add_prefix_to_snapshot, 
metadata['snapshots']))
+      metadata['snapshots'] = \
+          list(map(partial(add_prefix_to_snapshot, table_params), 
metadata['snapshots']))
 
     # metadata-log: optional
     if 'metadata-log' in metadata:
-      metadata['metadata-log'] = list(map(add_prefix_to_mlog, 
metadata['metadata-log']))
+      metadata['metadata-log'] = \
+          list(map(partial(add_prefix_to_mlog, table_params), 
metadata['metadata-log']))
 
     with open(mfile + '.tmp', 'w') as f:
       json.dump(metadata, f, indent=2)
     os.rename(mfile + '.tmp', mfile)
 
-  for afile in glob.glob(os.path.join(arg, '*.avro')):
+  # Easier to cache it instead of trying to resolve the manifest files paths
+  file_size_cache = {}
+
+  for afile in glob.glob(os.path.join(metadata_dir, '*.avro')):
     with open(afile, 'rb') as f:
       with DataFileReader(f, DatumReader()) as reader:
         schema = reader.datum_reader.writers_schema
-        lines = list(map(add_prefix_to_snapshot_entry, reader))
+        lines = list(map(
+            partial(add_prefix_to_snapshot_entry, table_params),
+            reader))
 
     with open(afile + '.tmp', 'wb') as f:
       with DataFileWriter(f, DatumWriter(), schema) as writer:
@@ -123,14 +83,72 @@ for arg in args[1:]:
     filename = afile.split('/')[-1]
     file_size_cache[filename] = os.path.getsize(afile)
 
-  for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')):
+  for snapfile in glob.glob(os.path.join(metadata_dir, 'snap*.avro')):
     with open(snapfile, 'rb') as f:
       with DataFileReader(f, DatumReader()) as reader:
         schema = reader.datum_reader.writers_schema
-        lines = list(map(fix_manifest_length, reader))
+        lines = list(map(partial(fix_manifest_length, file_size_cache), 
reader))
 
     with open(snapfile + '.tmp', 'wb') as f:
       with DataFileWriter(f, DatumWriter(), schema) as writer:
         for line in lines:
           writer.append(line)
     os.rename(snapfile + '.tmp', snapfile)
+
+
+def generate_new_path(table_params, file_path):
+  """ Hive generates metadata with absolute paths.
+  This method relativizes the path and applies a new prefix."""
+  prefix, unique_database, table_name = table_params
+  start_directory = "/test-warehouse"
+  start = file_path.find(start_directory)
+  if start == -1:
+    raise RuntimeError("{} is not found in file path:{}".format(
+      start_directory, file_path))
+
+  result = file_path[start:]
+  if prefix:
+    result = prefix + result
+
+  if not unique_database:
+    return result
+
+  def replace_last(s, old_expr, new_expr):
+    maxsplit = 1
+    li = s.rsplit(old_expr, maxsplit)
+    assert len(li) == 2
+    return new_expr.join(li)
+
+  return replace_last(result, table_name, "{}/{}".format(unique_database, 
table_name))
+
+
+def add_prefix_to_snapshot(table_params, snapshot):
+  if 'manifest-list' in snapshot:
+    snapshot['manifest-list'] = generate_new_path(table_params, 
snapshot['manifest-list'])
+  if 'manifests' in snapshot:
+    snapshot['manifests'] = [
+        generate_new_path(table_params, m) for m in snapshot['manifests']]
+  return snapshot
+
+
+def add_prefix_to_mlog(table_params, metadata_log):
+  metadata_log['metadata-file'] = generate_new_path(
+      table_params, metadata_log['metadata-file'])
+  return metadata_log
+
+
+def add_prefix_to_snapshot_entry(table_params, entry):
+  if 'manifest_path' in entry:
+    entry['manifest_path'] = generate_new_path(table_params, 
entry['manifest_path'])
+  if 'data_file' in entry:
+    entry['data_file']['file_path'] = generate_new_path(
+        table_params, entry['data_file']['file_path'])
+  return entry
+
+
+def fix_manifest_length(file_size_cache, entry):
+  if 'manifest_path' in entry and 'manifest_length' in entry:
+    filename = entry['manifest_path'].split('/')[-1]
+    if filename in file_size_cache:
+      entry['manifest_length'] = file_size_cache[filename]
+  return entry

Reply via email to