This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 20fe8fea58061f5da7d5c0e7d26755712d02ef79 Author: Riza Suminto <[email protected]> AuthorDate: Mon Jan 9 14:55:05 2023 -0800 IMPALA-11658: Implement Iceberg manifest caching config for Impala Impala needs to supply Iceberg's catalog properties to enable manifest caching feature. This commit implements the necessary config reading. Iceberg related config is read from hadoop-conf.xml and supplied as a Map in catalog instantiation. Additionally, this patch also replace deprecated RuntimeIOException with its superclass, UncheckedIOException. Testing: - Pass core tests. - Checked that manifest caching works through debug logging. Change-Id: I5a60a700d2ae6302dfe395d1ef602e6b1d821888 Reviewed-on: http://gerrit.cloudera.org:8080/19423 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/iceberg/IcebergCatalog.java | 14 +++++++++ .../impala/catalog/iceberg/IcebergCatalogs.java | 15 ---------- .../catalog/iceberg/IcebergHadoopCatalog.java | 9 +++--- .../catalog/iceberg/IcebergHadoopTables.java | 5 ++-- .../impala/catalog/iceberg/IcebergHiveCatalog.java | 4 ++- .../java/org/apache/impala/util/IcebergUtil.java | 34 +++++++++++++++++++++- .../common/etc/hadoop/conf/core-site.xml.py | 7 +++++ 7 files changed, 65 insertions(+), 23 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java index 9dd693d15..1aa239ba2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java @@ -72,4 +72,18 @@ public interface IcebergCatalog { * For HadoopCatalog, Iceberg implement 'renameTable' method with Exception threw */ void renameTable(FeIcebergTable feTable, TableIdentifier newTableId); + + /** + * Some of the implemetation methods might be running on native threads as they might + * be invoked via JNI. In that case the context class loader for those threads are + * null. 'Catalogs' uses JNDI to load the catalog implementations, e.g. HadoopCatalog + * or HiveCatalog. JNDI uses the context class loader, but as it is null it falls back + * to the bootstrap class loader that doesn't have the Iceberg classes on its classpath. + * To avoid ClassNotFoundException we set the context class loader to the class loader + * that loaded this class. + */ + default void setContextClassLoader() { + if (Thread.currentThread().getContextClassLoader() != null) return; + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java index 2c1620a60..488cf348c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java @@ -30,7 +30,6 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; @@ -160,18 +159,4 @@ public class IcebergCatalogs implements IcebergCatalog { tableProps.get(IcebergTable.ICEBERG_CATALOG)); return properties; } - - /** - * Some of the above methods might be running on native threads as they might be invoked - * via JNI. In that case the context class loader for those threads are null. 'Catalogs' - * uses JNDI to load the catalog implementations, e.g. HadoopCatalog or HiveCatalog. - * JNDI uses the context class loader, but as it is null it falls back to the bootstrap - * class loader that doesn't have the Iceberg classes on its classpath. - * To avoid ClassNotFoundException we set the context class loader to the class loader - * that loaded this class. - */ - private void setContextClassLoader() { - if (Thread.currentThread().getContextClassLoader() != null) return; - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - } } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java index 287b004b3..80d542dee 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java @@ -17,16 +17,16 @@ package org.apache.impala.catalog.iceberg; +import java.io.UncheckedIOException; import java.lang.NullPointerException; -import java.util.HashMap; import java.util.Map; + import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.TableLoadingException; @@ -47,8 +47,9 @@ public class IcebergHadoopCatalog implements IcebergCatalog { private HadoopCatalog hadoopCatalog; public IcebergHadoopCatalog(String catalogLocation) { + setContextClassLoader(); hadoopCatalog = new HadoopCatalog(); - Map<String, String> props = new HashMap<>(); + Map<String, String> props = IcebergUtil.composeCatalogProperties(); props.put(CatalogProperties.WAREHOUSE_LOCATION, catalogLocation); hadoopCatalog.setConf(FileSystemUtil.getConfiguration()); hadoopCatalog.initialize("", props); @@ -86,7 +87,7 @@ public class IcebergHadoopCatalog implements IcebergCatalog { return hadoopCatalog.loadTable(tableId); } catch (NoSuchTableException e) { throw new TableLoadingException(e.getMessage()); - } catch (NullPointerException | RuntimeIOException e) { + } catch (NullPointerException | UncheckedIOException e) { if (attempt == MAX_ATTEMPTS - 1) { // Throw exception on last attempt. throw new TableLoadingException(String.format( diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java index 42017f367..5a99c0b56 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java @@ -17,14 +17,15 @@ package org.apache.impala.catalog.iceberg; +import java.io.UncheckedIOException; import java.lang.NullPointerException; import java.util.Map; + import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.TableLoadingException; @@ -85,7 +86,7 @@ public class IcebergHadoopTables implements IcebergCatalog { return hadoopTables.load(tableLocation); } catch (NoSuchTableException e) { throw new TableLoadingException(e.getMessage()); - } catch (NullPointerException | RuntimeIOException e) { + } catch (NullPointerException | UncheckedIOException e) { if (attempt == MAX_ATTEMPTS - 1) { // Throw exception on last attempt. throw new TableLoadingException(String.format( diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java index 9f7e5b719..b4b5f4b32 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java @@ -51,11 +51,13 @@ public class IcebergHiveCatalog implements IcebergCatalog { private HiveCatalog hiveCatalog_; private IcebergHiveCatalog() { + setContextClassLoader(); HiveConf conf = new HiveConf(IcebergHiveCatalog.class); conf.setBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, true); hiveCatalog_ = new HiveCatalog(); hiveCatalog_.setConf(conf); - hiveCatalog_.initialize("ImpalaHiveCatalog", new HashMap<>()); + Map<String, String> properties = IcebergUtil.composeCatalogProperties(); + hiveCatalog_.initialize("ImpalaHiveCatalog", properties); } @Override diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index 72859527e..2c304b00c 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -34,12 +34,14 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -56,6 +58,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.transforms.PartitionSpecVisitor; @@ -79,6 +82,7 @@ import org.apache.impala.catalog.iceberg.IcebergCatalogs; import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog; import org.apache.impala.catalog.iceberg.IcebergHadoopTables; import org.apache.impala.catalog.iceberg.IcebergHiveCatalog; +import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; import org.apache.impala.fb.FbFileMetadata; @@ -1033,4 +1037,32 @@ public class IcebergUtil { IcebergPartitionSpec spec) { return getPartitionTransformType(column, spec) != TIcebergPartitionTransformType.VOID; } + + /** + * Compose Iceberg catalog properties from Hadoop Configuration. + */ + public static Map<String, String> composeCatalogProperties() { + Configuration conf = FileSystemUtil.getConfiguration(); + Map<String, String> props = new HashMap<>(); + List<String> configKeys = new ArrayList<>(Arrays.asList( + CatalogProperties.FILE_IO_IMPL, CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, + CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH)); + + for (String key : configKeys) { + String val = conf.get("iceberg." + key); + if (val != null) { + props.put(key, val); + } + } + + if (!props.containsKey(CatalogProperties.FILE_IO_IMPL)) { + // Manifest caching only enabled if "io-impl" is specified. Default to HadoopFileIO + // if non-existent. + props.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName()); + } + + return props; + } } diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py index 499cba249..bcb17edfa 100644 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py @@ -118,6 +118,13 @@ CONFIG = { 'fs.oss.endpoint': '${OSS_ACCESS_ENDPOINT}', 'fs.oss.impl': 'org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem', 'fs.AbstractFileSystem.oss.impl': 'org.apache.hadoop.fs.aliyun.oss.OSS', + + # Manifest caching configuration for Iceberg. + 'iceberg.io-impl': 'org.apache.iceberg.hadoop.HadoopFileIO', + 'iceberg.io.manifest.cache-enabled': 'true', + 'iceberg.io.manifest.cache.expiration-interval-ms': '60000', + 'iceberg.io.manifest.cache.max-total-bytes': '104857600', + 'iceberg.io.manifest.cache.max-content-length': '8388608', } if target_filesystem == 's3':
