This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ae4ff47fe7547863df06e5e224b87065a4eaf110 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Thu Oct 3 16:58:58 2024 +0200 IMPALA-12967, IMPALA-13059, IMPALA-13144, IMPALA-13195: test_migrated_table_field_id_resolution fails in exhaustive mode When executed in exhaustive mode, multiple instances of test_migrated_table_field_id_resolution is running in parallel, reading and writing the same files which can lead to various errors, hence the multiple Jira tickets in the title. Building upon rewrite-iceberg-metadata.py, with this patch the different test instances load the tables under different directories (corresponding to the unique_database). Change-Id: Id41a78940a5da5344735974e1d2c94ed4f24539a Reviewed-on: http://gerrit.cloudera.org:8080/21882 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- testdata/bin/load-test-warehouse-snapshot.sh | 5 +- testdata/bin/rewrite-iceberg-metadata.py | 109 +--------------- tests/common/file_utils.py | 26 ++-- .../util/iceberg_metadata_util.py | 144 ++++++++++++--------- 4 files changed, 101 insertions(+), 183 deletions(-) diff --git a/testdata/bin/load-test-warehouse-snapshot.sh b/testdata/bin/load-test-warehouse-snapshot.sh index ff73d11eb..480eaa4b7 100755 --- a/testdata/bin/load-test-warehouse-snapshot.sh +++ b/testdata/bin/load-test-warehouse-snapshot.sh @@ -117,8 +117,9 @@ if [ "${TARGET_FILESYSTEM}" != "hdfs" ]; then # Need to rewrite test metadata regardless of ${WAREHOUSE_LOCATION_PREFIX} because # paths can have "hdfs://" scheme echo "Updating Iceberg locations with warehouse prefix ${WAREHOUSE_LOCATION_PREFIX}" - ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py "${WAREHOUSE_LOCATION_PREFIX}" \ - $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata") + PYTHONPATH=${IMPALA_HOME} ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py \ + "${WAREHOUSE_LOCATION_PREFIX}" \ + $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata") fi echo "Copying data to ${TARGET_FILESYSTEM}" diff --git a/testdata/bin/rewrite-iceberg-metadata.py b/testdata/bin/rewrite-iceberg-metadata.py index d0c4d40dc..a435b5993 100755 --- a/testdata/bin/rewrite-iceberg-metadata.py +++ b/testdata/bin/rewrite-iceberg-metadata.py @@ -18,14 +18,9 @@ # under the License. from __future__ import absolute_import, division, print_function -from builtins import map -import glob -import json -import os import sys +from tests.util.iceberg_metadata_util import rewrite_metadata -from avro.datafile import DataFileReader, DataFileWriter -from avro.io import DatumReader, DatumWriter args = sys.argv[1:] if len(args) < 2: @@ -34,103 +29,5 @@ if len(args) < 2: prefix = args[0] -# Easier to cache it instead of trying to resolve the manifest files paths -file_size_cache = {} - - -def generate_new_path(prefix, file_path): - """ Hive generates metadata with absolute paths. - This method relativizes the path and applies a new prefix.""" - start_directory = "/test-warehouse" - start = file_path.find(start_directory) - if start == -1: - raise RuntimeError("{} is not found in file path:{}".format( - start_directory, file_path)) - return prefix + file_path[start:] - - -def add_prefix_to_snapshot(snapshot): - if 'manifest-list' in snapshot: - snapshot['manifest-list'] = generate_new_path(prefix, snapshot['manifest-list']) - if 'manifests' in snapshot: - snapshot['manifests'] = [generate_new_path(prefix, m) for m in snapshot['manifests']] - return snapshot - - -def add_prefix_to_mlog(metadata_log): - metadata_log['metadata-file'] = generate_new_path(prefix, metadata_log['metadata-file']) - return metadata_log - - -def add_prefix_to_snapshot_entry(entry): - if 'manifest_path' in entry: - entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path']) - if 'data_file' in entry: - entry['data_file']['file_path'] = generate_new_path(prefix, - entry['data_file']['file_path']) - return entry - - -def fix_manifest_length(entry): - if 'manifest_path' in entry and 'manifest_length' in entry: - filename = entry['manifest_path'].split('/')[-1] - if filename in file_size_cache: - entry['manifest_length'] = file_size_cache[filename] - return entry - - -for arg in args[1:]: - # Update metadata.json - for mfile in glob.glob(os.path.join(arg, '*.metadata.json')): - with open(mfile, 'r') as f: - metadata = json.load(f) - - if 'format-version' not in metadata: - print("WARN: skipping {}, missing format-version".format(f)) - continue - - version = metadata['format-version'] - if version < 1 or version > 2: - print("WARN: skipping {}, unknown version {}".format(f, version)) - continue - - # metadata: required - metadata['location'] = generate_new_path(prefix, metadata['location']) - - # snapshots: optional - if 'snapshots' in metadata: - metadata['snapshots'] = list(map(add_prefix_to_snapshot, metadata['snapshots'])) - - # metadata-log: optional - if 'metadata-log' in metadata: - metadata['metadata-log'] = list(map(add_prefix_to_mlog, metadata['metadata-log'])) - - with open(mfile + '.tmp', 'w') as f: - json.dump(metadata, f, indent=2) - os.rename(mfile + '.tmp', mfile) - - for afile in glob.glob(os.path.join(arg, '*.avro')): - with open(afile, 'rb') as f: - with DataFileReader(f, DatumReader()) as reader: - schema = reader.datum_reader.writers_schema - lines = list(map(add_prefix_to_snapshot_entry, reader)) - - with open(afile + '.tmp', 'wb') as f: - with DataFileWriter(f, DatumWriter(), schema) as writer: - for line in lines: - writer.append(line) - os.rename(afile + '.tmp', afile) - filename = afile.split('/')[-1] - file_size_cache[filename] = os.path.getsize(afile) - - for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')): - with open(snapfile, 'rb') as f: - with DataFileReader(f, DatumReader()) as reader: - schema = reader.datum_reader.writers_schema - lines = list(map(fix_manifest_length, reader)) - - with open(snapfile + '.tmp', 'wb') as f: - with DataFileWriter(f, DatumWriter(), schema) as writer: - for line in lines: - writer.append(line) - os.rename(snapfile + '.tmp', snapfile) +for metadata_dir in args[1:]: + rewrite_metadata(prefix, None, metadata_dir) diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index f3c4e478f..66431dd16 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -27,6 +27,7 @@ import tempfile from subprocess import check_call from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX +from tests.util.iceberg_metadata_util import rewrite_metadata def create_iceberg_table_from_directory(impala_client, unique_database, table_name, @@ -38,28 +39,29 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na assert file_format == "orc" or file_format == "parquet" local_dir = os.path.join( - os.environ['IMPALA_HOME'], 'testdata/data/iceberg_test/{0}'.format(table_name)) + os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name) assert os.path.isdir(local_dir) - # If using a prefix, rewrite iceberg metadata to use the prefix - if WAREHOUSE_PREFIX: - tmp_dir = tempfile.mktemp(table_name) - check_call(['cp', '-r', local_dir, tmp_dir]) - rewrite = os.path.join( - os.environ['IMPALA_HOME'], 'testdata/bin/rewrite-iceberg-metadata.py') - check_call([rewrite, WAREHOUSE_PREFIX, os.path.join(tmp_dir, 'metadata')]) - local_dir = tmp_dir + # Rewrite iceberg metadata to use the warehouse prefix and use unique_database + tmp_dir = tempfile.mktemp(table_name) + # Need to create the temp dir so 'cp -r' will copy local dir with its original name + # under the temp dir. rewrite_metadata() has the assumption that the parent directory + # of the 'metadata' directory bears the name of the table. + check_call(['mkdir', '-p', tmp_dir]) + check_call(['cp', '-r', local_dir, tmp_dir]) + local_dir = os.path.join(tmp_dir, table_name) + rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata')) # Put the directory in the database's directory (not the table directory) - hdfs_parent_dir = get_fs_path("/test-warehouse") - + hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database) hdfs_dir = os.path.join(hdfs_parent_dir, table_name) # Purge existing files if any check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir]) # Note: -d skips a staging copy - check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_dir]) + check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_parent_dir]) + check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir]) # Create external table qualified_table_name = '{0}.{1}'.format(unique_database, table_name) diff --git a/testdata/bin/rewrite-iceberg-metadata.py b/tests/util/iceberg_metadata_util.py old mode 100755 new mode 100644 similarity index 58% copy from testdata/bin/rewrite-iceberg-metadata.py copy to tests/util/iceberg_metadata_util.py index d0c4d40dc..f9935b227 --- a/testdata/bin/rewrite-iceberg-metadata.py +++ b/tests/util/iceberg_metadata_util.py @@ -1,5 +1,3 @@ -#!/usr/bin/env impala-python -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -19,69 +17,18 @@ from __future__ import absolute_import, division, print_function from builtins import map +from functools import partial import glob import json import os -import sys from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter -args = sys.argv[1:] -if len(args) < 2: - print("Usage: rewrite-iceberg-metadata.py PREFIX METADATA-dirs...") - exit(1) - -prefix = args[0] - -# Easier to cache it instead of trying to resolve the manifest files paths -file_size_cache = {} - - -def generate_new_path(prefix, file_path): - """ Hive generates metadata with absolute paths. - This method relativizes the path and applies a new prefix.""" - start_directory = "/test-warehouse" - start = file_path.find(start_directory) - if start == -1: - raise RuntimeError("{} is not found in file path:{}".format( - start_directory, file_path)) - return prefix + file_path[start:] - - -def add_prefix_to_snapshot(snapshot): - if 'manifest-list' in snapshot: - snapshot['manifest-list'] = generate_new_path(prefix, snapshot['manifest-list']) - if 'manifests' in snapshot: - snapshot['manifests'] = [generate_new_path(prefix, m) for m in snapshot['manifests']] - return snapshot - -def add_prefix_to_mlog(metadata_log): - metadata_log['metadata-file'] = generate_new_path(prefix, metadata_log['metadata-file']) - return metadata_log - - -def add_prefix_to_snapshot_entry(entry): - if 'manifest_path' in entry: - entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path']) - if 'data_file' in entry: - entry['data_file']['file_path'] = generate_new_path(prefix, - entry['data_file']['file_path']) - return entry - - -def fix_manifest_length(entry): - if 'manifest_path' in entry and 'manifest_length' in entry: - filename = entry['manifest_path'].split('/')[-1] - if filename in file_size_cache: - entry['manifest_length'] = file_size_cache[filename] - return entry - - -for arg in args[1:]: +def rewrite_metadata(prefix, unique_database, metadata_dir): # Update metadata.json - for mfile in glob.glob(os.path.join(arg, '*.metadata.json')): + for mfile in glob.glob(os.path.join(metadata_dir, '*.metadata.json')): with open(mfile, 'r') as f: metadata = json.load(f) @@ -94,26 +41,39 @@ for arg in args[1:]: print("WARN: skipping {}, unknown version {}".format(f, version)) continue + metadata_parent_dir = os.path.split(metadata_dir.rstrip("/"))[0] + table_name = os.path.basename(metadata_parent_dir) + + table_params = (prefix, unique_database, table_name) + # metadata: required - metadata['location'] = generate_new_path(prefix, metadata['location']) + metadata['location'] = generate_new_path( + table_params, metadata['location']) # snapshots: optional if 'snapshots' in metadata: - metadata['snapshots'] = list(map(add_prefix_to_snapshot, metadata['snapshots'])) + metadata['snapshots'] = \ + list(map(partial(add_prefix_to_snapshot, table_params), metadata['snapshots'])) # metadata-log: optional if 'metadata-log' in metadata: - metadata['metadata-log'] = list(map(add_prefix_to_mlog, metadata['metadata-log'])) + metadata['metadata-log'] = \ + list(map(partial(add_prefix_to_mlog, table_params), metadata['metadata-log'])) with open(mfile + '.tmp', 'w') as f: json.dump(metadata, f, indent=2) os.rename(mfile + '.tmp', mfile) - for afile in glob.glob(os.path.join(arg, '*.avro')): + # Easier to cache it instead of trying to resolve the manifest files paths + file_size_cache = {} + + for afile in glob.glob(os.path.join(metadata_dir, '*.avro')): with open(afile, 'rb') as f: with DataFileReader(f, DatumReader()) as reader: schema = reader.datum_reader.writers_schema - lines = list(map(add_prefix_to_snapshot_entry, reader)) + lines = list(map( + partial(add_prefix_to_snapshot_entry, table_params), + reader)) with open(afile + '.tmp', 'wb') as f: with DataFileWriter(f, DatumWriter(), schema) as writer: @@ -123,14 +83,72 @@ for arg in args[1:]: filename = afile.split('/')[-1] file_size_cache[filename] = os.path.getsize(afile) - for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')): + for snapfile in glob.glob(os.path.join(metadata_dir, 'snap*.avro')): with open(snapfile, 'rb') as f: with DataFileReader(f, DatumReader()) as reader: schema = reader.datum_reader.writers_schema - lines = list(map(fix_manifest_length, reader)) + lines = list(map(partial(fix_manifest_length, file_size_cache), reader)) with open(snapfile + '.tmp', 'wb') as f: with DataFileWriter(f, DatumWriter(), schema) as writer: for line in lines: writer.append(line) os.rename(snapfile + '.tmp', snapfile) + + +def generate_new_path(table_params, file_path): + """ Hive generates metadata with absolute paths. + This method relativizes the path and applies a new prefix.""" + prefix, unique_database, table_name = table_params + start_directory = "/test-warehouse" + start = file_path.find(start_directory) + if start == -1: + raise RuntimeError("{} is not found in file path:{}".format( + start_directory, file_path)) + + result = file_path[start:] + if prefix: + result = prefix + result + + if not unique_database: + return result + + def replace_last(s, old_expr, new_expr): + maxsplit = 1 + li = s.rsplit(old_expr, maxsplit) + assert len(li) == 2 + return new_expr.join(li) + + return replace_last(result, table_name, "{}/{}".format(unique_database, table_name)) + + +def add_prefix_to_snapshot(table_params, snapshot): + if 'manifest-list' in snapshot: + snapshot['manifest-list'] = generate_new_path(table_params, snapshot['manifest-list']) + if 'manifests' in snapshot: + snapshot['manifests'] = [ + generate_new_path(table_params, m) for m in snapshot['manifests']] + return snapshot + + +def add_prefix_to_mlog(table_params, metadata_log): + metadata_log['metadata-file'] = generate_new_path( + table_params, metadata_log['metadata-file']) + return metadata_log + + +def add_prefix_to_snapshot_entry(table_params, entry): + if 'manifest_path' in entry: + entry['manifest_path'] = generate_new_path(table_params, entry['manifest_path']) + if 'data_file' in entry: + entry['data_file']['file_path'] = generate_new_path( + table_params, entry['data_file']['file_path']) + return entry + + +def fix_manifest_length(file_size_cache, entry): + if 'manifest_path' in entry and 'manifest_length' in entry: + filename = entry['manifest_path'].split('/')[-1] + if filename in file_size_cache: + entry['manifest_length'] = file_size_cache[filename] + return entry
