This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f7e629935b77f412bf74aeebd704af88f03de351
Author: halim.kim <[email protected]>
AuthorDate: Wed Jul 19 08:57:19 2023 +0900

    IMPALA-11871: Skip permissions loading and check on HDFS if Ranger is 
enabled
    
    Before this patch, Impala checked whether the Impala service user had
    the WRITE access to the target HDFS table/partition(s) during the
    analysis of the INSERT and LOAD DATA statements in the legacy catalog
    mode. The access levels of the corresponding HDFS table and partitions
    were computed by the catalog server solely based on the HDFS permissions
    and ACLs when the table and partitions were instantiated.
    
    After this patch, we skip loading HDFS permissions and assume the
    Impala service user has the READ_WRITE permission on all the HDFS paths
    associated with the target table during query analysis when Ranger is
    enabled. The assumption could be removed after Impala's implementation
    of FsPermissionChecker could additionally take Ranger's policies of HDFS
    into consideration when performing the check.
    
    Testing:
     - Added end-to-end tests to verify Impala's behavior with respect to
       the INSERT and LOAD DATA statements when Ranger is enabled in the
       legacy catalog mode.
    
    Change-Id: Id33c400fbe0c918b6b65d713b09009512835a4c9
    Reviewed-on: http://gerrit.cloudera.org:8080/20221
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/rat_exclude_files.txt                          |   1 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  14 +-
 testdata/data/load_data_with_catalog_v1.txt        |   2 +
 tests/authorization/test_ranger.py                 | 147 +++++++++++++++++++++
 4 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 51f6acadc..bcd7df3a1 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -176,6 +176,7 @@ testdata/data/iceberg_test/*
 testdata/data/json_test/*
 testdata/data/sfs_d2.txt
 testdata/data/sfs_d4.txt
+testdata/data/load_data_with_catalog_v1.txt
 testdata/datasets/functional/functional_schema_template.sql
 testdata/impala-profiles/README
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 8954eab0b..7bc4edb48 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -843,7 +843,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * permissions Impala has on the given path. If the path does not exist, 
recurses up
    * the path until a existing parent directory is found, and inherit access 
permissions
    * from that.
-   * Always returns READ_WRITE for S3, ADLS, GCS and COS files.
+   * Always returns READ_WRITE for S3, ADLS, GCS, COS files, and 
Ranger-enabled HDFS.
    */
   private static TAccessLevel getAvailableAccessLevel(String tableName,
       Path location, FsPermissionCache permCache) throws IOException {
@@ -881,6 +881,18 @@ public class HdfsTable extends Table implements FeFsTable {
     // LocalFsTable. Remove this once we resolve IMPALA-7539.
     if (BackendConfig.INSTANCE.isMinimalTopicMode()) return true;
 
+    // Authorization on Ranger-enabled HDFS can be managed via the HDFS policy
+    // repository, and thus it is possible to check the existence of necessary
+    // permissions via RPC to the Hadoop NameNode, or Ranger REST API. Before
+    // IMPALA-12994 is resolved, we assume the Impala service user has the 
READ_WRITE
+    // permission on all HDFS files in the case when Ranger is enabled in 
Impala.
+    // This also avoids significant overhead when the number of table 
partitions gets
+    // bigger.
+    if (FileSystemUtil.isDistributedFileSystem(fs) &&
+        
BackendConfig.INSTANCE.getAuthorizationProvider().equalsIgnoreCase("ranger")) {
+      return true;
+    }
+
     // Avoid calling getPermissions() on file path for S3 files, as that makes 
a round
     // trip to S3. Also, the S3A connector is currently unable to manage S3 
permissions,
     // so for now it is safe to assume that all files(objects) have READ_WRITE
diff --git a/testdata/data/load_data_with_catalog_v1.txt 
b/testdata/data/load_data_with_catalog_v1.txt
new file mode 100644
index 000000000..cdd419ca5
--- /dev/null
+++ b/testdata/data/load_data_with_catalog_v1.txt
@@ -0,0 +1,2 @@
+Adam
+Alex
diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index f83680614..45b3515ea 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -29,6 +29,7 @@ from subprocess import check_call
 
 from getpass import getuser
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.file_utils import copy_files_to_hdfs_dir
 from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIf
 from tests.common.test_dimensions import (create_client_protocol_dimension,
     create_exec_option_dimension, create_orc_dimension)
@@ -76,6 +77,152 @@ class TestRanger(CustomClusterTestSuite):
     self._test_grant_revoke(unique_name, [None, "invalidate metadata",
                                           "refresh authorization"])
 
+  @pytest.mark.execute_serially
+  @SkipIfFS.hdfs_acls
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_insert_with_catalog_v1(self, unique_name):
+    """
+    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
+    Impala does not throw an AnalysisException when an authorized user tries 
to execute
+    an INSERT query against a partitioned table of which the respective table 
path and
+    the partition path are not writable according to HDFS permission.
+    """
+    user = getuser()
+    admin_client = self.create_impala_client()
+    unique_database = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+    partition_column = "year"
+    partition_value = "2008"
+    table_path = "test-warehouse/{0}.db/{1}".format(unique_database, 
unique_table)
+    table_partition_path = "{0}/{1}={2}"\
+        .format(table_path, partition_column, partition_value)
+    insert_statement = "insert into {0}.{1} (name) partition ({2}) " \
+        "values (\"Adam\", {3})".format(unique_database, unique_table, 
partition_column,
+        partition_value)
+    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
+        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
+    try:
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database), user=ADMIN)
+      admin_client.execute("create database {0}".format(unique_database), 
user=ADMIN)
+      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int)"
+          .format(unique_database, unique_table, partition_column), user=ADMIN)
+      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
+          .format(unique_database, unique_table, partition_column, 
partition_value),
+          user=ADMIN)
+
+      # Change the owner user and group of the HDFS paths corresponding to the 
table and
+      # the partition so that according to Impala's FsPermissionChecker, the 
table is not
+      # writable to the user that loads the table. This user usually is the one
+      # representing the Impala service. Before IMPALA-11871, changing either 
the table
+      # path or the partition path to non-writable would result in an 
AnalysisException.
+      self.hdfs_client.chown(table_path, "another_user", "another_group")
+      self.hdfs_client.chown(table_partition_path, "another_user", 
"another_group")
+      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
+      # table and the related partition(s).
+      admin_client.execute("invalidate metadata {0}.{1}"
+          .format(unique_database, unique_table), user=ADMIN)
+
+      # Verify that the INSERT statement fails with AuthorizationException 
because the
+      # requesting user does not have the INSERT privilege on the table.
+      result = self._run_query_as_user(insert_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant insert on table {0}.{1} to user {2}"
+          .format(unique_database, unique_table, user), user=ADMIN)
+      # Verify that the INSERT statement succeeds without AnalysisException.
+      self._run_query_as_user(insert_statement, user, True)
+    finally:
+      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
+          .format(unique_database, unique_table, user))
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database), user=ADMIN)
+
+  @pytest.mark.execute_serially
+  @SkipIfFS.hdfs_acls
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_load_data_with_catalog_v1(self, unique_name):
+    """
+    Test that when Ranger is the authorization provider in the legacy catalog 
mode,
+    Impala does not throw an AnalysisException when an authorized user tries 
to execute
+    a LOAD DATA query against a table partition of which the respective 
partition path is
+    not writable according to Impala's FsPermissionChecker.
+    """
+    user = getuser()
+    admin_client = self.create_impala_client()
+    unique_database = unique_name + "_db"
+    unique_table = unique_name + "_tbl"
+    partition_column = "year"
+    partition_value = "2008"
+    destination_table_path = "test-warehouse/{0}.db/{1}" \
+        .format(unique_database, unique_table, )
+    destination_table_partition_path = "{0}/{1}={2}"\
+        .format(destination_table_path, partition_column, partition_value)
+    file_name = "load_data_with_catalog_v1.txt"
+    files_for_table = ["testdata/data/{0}".format(file_name)]
+    source_hdfs_dir = "/tmp"
+    load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \
+        "partition ({4}={5})".format(source_hdfs_dir, file_name, 
unique_database,
+        unique_table, partition_column, partition_value)
+    authz_err = "AuthorizationException: User '{0}' does not have privileges 
to " \
+        "execute 'INSERT' on: {1}.{2}".format(user, unique_database, 
unique_table)
+    try:
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database), user=ADMIN)
+      admin_client.execute("create database {0}".format(unique_database), 
user=ADMIN)
+      copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir)
+      admin_client.execute("create table {0}.{1} (name string) partitioned by 
({2} int) "
+          "row format delimited fields terminated by ',' "
+          "stored as textfile".format(unique_database, unique_table, 
partition_column),
+          user=ADMIN)
+      # We need to add the partition. Otherwise, the LOAD DATA statement can't 
create new
+      # partitions.
+      admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
+          .format(unique_database, unique_table, partition_column, 
partition_value),
+          user=ADMIN)
+
+      # Change the permissions of the HDFS path of the destination table 
partition.
+      # Before IMPALA-11871, even we changed the table path to non-writable, 
loading
+      # data into the partition was still allowed if the destination partition 
path
+      # was writable according to Impala's FsPermissionChecker. But if the 
destination
+      # partition path was not writable, an AnalysisException would be thrown.
+      self.hdfs_client.chown(destination_table_partition_path, "another_user",
+          "another_group")
+      # Invalidate the table metadata to force the catalog server to reload 
the HDFS
+      # table and the related partition(s).
+      admin_client.execute("invalidate metadata {0}.{1}"
+          .format(unique_database, unique_table), user=ADMIN)
+
+      # To execute the LOAD DATA statement, a user has to be granted the ALL 
privilege
+      # on the source HDFS path and the INSERT privilege on the destination 
table.
+      # The following verifies the LOAD DATA statement fails with 
AuthorizationException
+      # due to insufficient privileges.
+      result = self._run_query_as_user(load_data_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant all on uri '{0}/{1}' to user {2}"
+          .format(source_hdfs_dir, file_name, user), user=ADMIN)
+      # The following verifies the ALL privilege on the source file alone is 
not
+      # sufficient to execute the LOAD DATA statement.
+      result = self._run_query_as_user(load_data_statement, user, False)
+      assert authz_err in str(result)
+
+      admin_client.execute("grant insert on table {0}.{1} to user {2}"
+          .format(unique_database, unique_table, user), user=ADMIN)
+      # Verify the LOAD DATA statement fails without AnalysisException.
+      self._run_query_as_user(load_data_statement, user, True)
+    finally:
+      admin_client.execute("revoke all on uri '{0}/{1}' from user {2}"
+          .format(source_hdfs_dir, file_name, user), user=ADMIN)
+      admin_client.execute("revoke insert on table {0}.{1} from user {2}"
+          .format(unique_database, unique_table, user), user=ADMIN)
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database), user=ADMIN)
+      self.filesystem_client.delete_file_dir("{0}/{1}"
+          .format(source_hdfs_dir, file_name))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="{0} {1}".format(IMPALAD_ARGS, "--use_local_catalog=true"),

Reply via email to