This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4595371ea IMPALA-11821: Adjusting manifest_length and absolute paths 
in case of metadata rewrite
4595371ea is described below

commit 4595371ea29abdcf0707e17069bfcf984ca5b8e6
Author: Gergely Fürnstáhl <[email protected]>
AuthorDate: Wed Jan 11 16:09:11 2023 +0100

    IMPALA-11821: Adjusting manifest_length and absolute paths in case of 
metadata rewrite
    
    testdata/bin/rewrite-iceberg-metadata.py rewrites manifest and snapshot
    files using the provided prefix for file paths. Snapshot files store
    the length of manifest files as well, this needs to be adjusted too.
    
    Additionally, improved path rewrite to be able to rewrite absolute
    paths correctly and pretty dumping metadata jsons.
    
    Testing:
     - Tested locally, manually verified the rewrites
     - Tested on Ozone, automatically rewriting the test data and running
    test_iceberg.py
    
    Change-Id: I89b9208f25552012cc1ab16fa60a819dd5a683d9
    Reviewed-on: http://gerrit.cloudera.org:8080/19412
    Reviewed-by: Noemi Pap-Takacs <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 testdata/bin/rewrite-iceberg-metadata.py | 54 +++++++++++++++++++++++++++-----
 tests/common/skip.py                     |  2 +-
 tests/query_test/test_iceberg.py         |  2 --
 3 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/testdata/bin/rewrite-iceberg-metadata.py 
b/testdata/bin/rewrite-iceberg-metadata.py
index ec165f1d7..1ccee49a7 100755
--- a/testdata/bin/rewrite-iceberg-metadata.py
+++ b/testdata/bin/rewrite-iceberg-metadata.py
@@ -32,31 +32,55 @@ if len(args) < 2:
 
 prefix = args[0]
 
+# Easier to cache it instead of trying to resolve the manifest files paths
+file_size_cache = {}
+
+
+def generate_new_path(prefix, file_path):
+  """ Hive generates metadata with absolute paths.
+  This method relativizes the path and applies a new prefix."""
+  start_directory = "/test-warehouse"
+  start = file_path.find(start_directory)
+  if start == -1:
+    raise RuntimeError("{} is not found in file path:{}".format(
+      start_directory, file_path))
+  return prefix + file_path[start:]
+
 
 def add_prefix_to_snapshot(snapshot):
   if 'manifest-list' in snapshot:
-    snapshot['manifest-list'] = prefix + snapshot['manifest-list']
+    snapshot['manifest-list'] = generate_new_path(prefix, 
snapshot['manifest-list'])
   if 'manifests' in snapshot:
-    snapshot['manifests'] = map(lambda m: prefix + m, snapshot['manifests'])
+    snapshot['manifests'] = map(lambda m: generate_new_path(prefix, m),
+                                snapshot['manifests'])
   return snapshot
 
 
 def add_prefix_to_mlog(metadata_log):
-  metadata_log['metadata-file'] = prefix + metadata_log['metadata-file']
+  metadata_log['metadata-file'] = generate_new_path(prefix, 
metadata_log['metadata-file'])
   return metadata_log
 
 
 def add_prefix_to_snapshot_entry(entry):
   if 'manifest_path' in entry:
-    entry['manifest_path'] = prefix + entry['manifest_path']
+    entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path'])
   if 'data_file' in entry:
-    entry['data_file']['file_path'] = prefix + entry['data_file']['file_path']
+    entry['data_file']['file_path'] = generate_new_path(prefix,
+                                      entry['data_file']['file_path'])
+  return entry
+
+
+def fix_manifest_length(entry):
+  if 'manifest_path' in entry and 'manifest_length' in entry:
+    filename = entry['manifest_path'].split('/')[-1]
+    if filename in file_size_cache:
+      entry['manifest_length'] = file_size_cache[filename]
   return entry
 
 
 for arg in args[1:]:
   # Update metadata.json
-  for mfile in glob.glob(os.path.join(arg, 'v*.metadata.json')):
+  for mfile in glob.glob(os.path.join(arg, '*.metadata.json')):
     with open(mfile, 'r') as f:
       metadata = json.load(f)
 
@@ -70,7 +94,7 @@ for arg in args[1:]:
       continue
 
     # metadata: required
-    metadata['location'] = prefix + metadata['location']
+    metadata['location'] = generate_new_path(prefix, metadata['location'])
 
     # snapshots: optional
     if 'snapshots' in metadata:
@@ -81,7 +105,7 @@ for arg in args[1:]:
       metadata['metadata-log'] = map(add_prefix_to_mlog, 
metadata['metadata-log'])
 
     with open(mfile + '.tmp', 'w') as f:
-      json.dump(metadata, f)
+      json.dump(metadata, f, indent=2)
     os.rename(mfile + '.tmp', mfile)
 
   for afile in glob.glob(os.path.join(arg, '*.avro')):
@@ -95,3 +119,17 @@ for arg in args[1:]:
         for line in lines:
           writer.append(line)
     os.rename(afile + '.tmp', afile)
+    filename = afile.split('/')[-1]
+    file_size_cache[filename] = os.path.getsize(afile)
+
+  for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')):
+    with open(snapfile, 'rb') as f:
+      with DataFileReader(f, DatumReader()) as reader:
+        schema = reader.datum_reader.writers_schema
+        lines = map(fix_manifest_length, reader)
+
+    with open(snapfile + '.tmp', 'wb') as f:
+      with DataFileWriter(f, DatumWriter(), schema) as writer:
+        for line in lines:
+          writer.append(line)
+    os.rename(snapfile + '.tmp', snapfile)
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 648204395..85d2c678d 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -104,7 +104,7 @@ class SkipIf:
   sfs_unsupported = pytest.mark.skipif(not (IS_HDFS or IS_S3 or IS_ABFS or 
IS_ADLS
       or IS_GCS), reason="Hive support for sfs+ is limited, HIVE-26757")
   hardcoded_uris = pytest.mark.skipif(not IS_HDFS,
-      reason="Iceberg hardcodes the full URI in parquet delete files and 
metadata files")
+      reason="Iceberg delete files hardcode the full URI in parquet files")
   not_ec = pytest.mark.skipif(not IS_EC, reason="Erasure Coding needed")
   no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
       reason="Secondary filesystem needed")
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 062a6cc12..9ae75885f 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -842,7 +842,6 @@ class TestIcebergTable(IcebergTestSuite):
     self.run_test_case('QueryTest/iceberg-multiple-storage-locations-table',
                        vector, unique_database)
 
-  @SkipIf.hardcoded_uris
   def test_mixed_file_format(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
                       unique_database)
@@ -958,7 +957,6 @@ class TestIcebergTable(IcebergTestSuite):
   def test_virtual_columns(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-virtual-columns', vector, 
unique_database)
 
-  @SkipIf.hardcoded_uris
   def test_avro_file_format(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-avro', vector, unique_database)
 

Reply via email to