This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f7e629935b77f412bf74aeebd704af88f03de351 Author: halim.kim <[email protected]> AuthorDate: Wed Jul 19 08:57:19 2023 +0900 IMPALA-11871: Skip permissions loading and check on HDFS if Ranger is enabled Before this patch, Impala checked whether the Impala service user had the WRITE access to the target HDFS table/partition(s) during the analysis of the INSERT and LOAD DATA statements in the legacy catalog mode. The access levels of the corresponding HDFS table and partitions were computed by the catalog server solely based on the HDFS permissions and ACLs when the table and partitions were instantiated. After this patch, we skip loading HDFS permissions and assume the Impala service user has the READ_WRITE permission on all the HDFS paths associated with the target table during query analysis when Ranger is enabled. The assumption could be removed after Impala's implementation of FsPermissionChecker could additionally take Ranger's policies of HDFS into consideration when performing the check. Testing: - Added end-to-end tests to verify Impala's behavior with respect to the INSERT and LOAD DATA statements when Ranger is enabled in the legacy catalog mode. Change-Id: Id33c400fbe0c918b6b65d713b09009512835a4c9 Reviewed-on: http://gerrit.cloudera.org:8080/20221 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/rat_exclude_files.txt | 1 + .../java/org/apache/impala/catalog/HdfsTable.java | 14 +- testdata/data/load_data_with_catalog_v1.txt | 2 + tests/authorization/test_ranger.py | 147 +++++++++++++++++++++ 4 files changed, 163 insertions(+), 1 deletion(-) diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt index 51f6acadc..bcd7df3a1 100644 --- a/bin/rat_exclude_files.txt +++ b/bin/rat_exclude_files.txt @@ -176,6 +176,7 @@ testdata/data/iceberg_test/* testdata/data/json_test/* testdata/data/sfs_d2.txt testdata/data/sfs_d4.txt +testdata/data/load_data_with_catalog_v1.txt testdata/datasets/functional/functional_schema_template.sql testdata/impala-profiles/README testdata/impala-profiles/impala_profile_log_tpcds_compute_stats diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 8954eab0b..7bc4edb48 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -843,7 +843,7 @@ public class HdfsTable extends Table implements FeFsTable { * permissions Impala has on the given path. If the path does not exist, recurses up * the path until a existing parent directory is found, and inherit access permissions * from that. - * Always returns READ_WRITE for S3, ADLS, GCS and COS files. + * Always returns READ_WRITE for S3, ADLS, GCS, COS files, and Ranger-enabled HDFS. */ private static TAccessLevel getAvailableAccessLevel(String tableName, Path location, FsPermissionCache permCache) throws IOException { @@ -881,6 +881,18 @@ public class HdfsTable extends Table implements FeFsTable { // LocalFsTable. Remove this once we resolve IMPALA-7539. if (BackendConfig.INSTANCE.isMinimalTopicMode()) return true; + // Authorization on Ranger-enabled HDFS can be managed via the HDFS policy + // repository, and thus it is possible to check the existence of necessary + // permissions via RPC to the Hadoop NameNode, or Ranger REST API. Before + // IMPALA-12994 is resolved, we assume the Impala service user has the READ_WRITE + // permission on all HDFS files in the case when Ranger is enabled in Impala. + // This also avoids significant overhead when the number of table partitions gets + // bigger. + if (FileSystemUtil.isDistributedFileSystem(fs) && + BackendConfig.INSTANCE.getAuthorizationProvider().equalsIgnoreCase("ranger")) { + return true; + } + // Avoid calling getPermissions() on file path for S3 files, as that makes a round // trip to S3. Also, the S3A connector is currently unable to manage S3 permissions, // so for now it is safe to assume that all files(objects) have READ_WRITE diff --git a/testdata/data/load_data_with_catalog_v1.txt b/testdata/data/load_data_with_catalog_v1.txt new file mode 100644 index 000000000..cdd419ca5 --- /dev/null +++ b/testdata/data/load_data_with_catalog_v1.txt @@ -0,0 +1,2 @@ +Adam +Alex diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index f83680614..45b3515ea 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -29,6 +29,7 @@ from subprocess import check_call from getpass import getuser from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.file_utils import copy_files_to_hdfs_dir from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIf from tests.common.test_dimensions import (create_client_protocol_dimension, create_exec_option_dimension, create_orc_dimension) @@ -76,6 +77,152 @@ class TestRanger(CustomClusterTestSuite): self._test_grant_revoke(unique_name, [None, "invalidate metadata", "refresh authorization"]) + @pytest.mark.execute_serially + @SkipIfFS.hdfs_acls + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_insert_with_catalog_v1(self, unique_name): + """ + Test that when Ranger is the authorization provider in the legacy catalog mode, + Impala does not throw an AnalysisException when an authorized user tries to execute + an INSERT query against a partitioned table of which the respective table path and + the partition path are not writable according to HDFS permission. + """ + user = getuser() + admin_client = self.create_impala_client() + unique_database = unique_name + "_db" + unique_table = unique_name + "_tbl" + partition_column = "year" + partition_value = "2008" + table_path = "test-warehouse/{0}.db/{1}".format(unique_database, unique_table) + table_partition_path = "{0}/{1}={2}"\ + .format(table_path, partition_column, partition_value) + insert_statement = "insert into {0}.{1} (name) partition ({2}) " \ + "values (\"Adam\", {3})".format(unique_database, unique_table, partition_column, + partition_value) + authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ + "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int)" + .format(unique_database, unique_table, partition_column), user=ADMIN) + admin_client.execute("alter table {0}.{1} add partition ({2}={3})" + .format(unique_database, unique_table, partition_column, partition_value), + user=ADMIN) + + # Change the owner user and group of the HDFS paths corresponding to the table and + # the partition so that according to Impala's FsPermissionChecker, the table is not + # writable to the user that loads the table. This user usually is the one + # representing the Impala service. Before IMPALA-11871, changing either the table + # path or the partition path to non-writable would result in an AnalysisException. + self.hdfs_client.chown(table_path, "another_user", "another_group") + self.hdfs_client.chown(table_partition_path, "another_user", "another_group") + # Invalidate the table metadata to force the catalog server to reload the HDFS + # table and the related partition(s). + admin_client.execute("invalidate metadata {0}.{1}" + .format(unique_database, unique_table), user=ADMIN) + + # Verify that the INSERT statement fails with AuthorizationException because the + # requesting user does not have the INSERT privilege on the table. + result = self._run_query_as_user(insert_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant insert on table {0}.{1} to user {2}" + .format(unique_database, unique_table, user), user=ADMIN) + # Verify that the INSERT statement succeeds without AnalysisException. + self._run_query_as_user(insert_statement, user, True) + finally: + admin_client.execute("revoke insert on table {0}.{1} from user {2}" + .format(unique_database, unique_table, user)) + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + + @pytest.mark.execute_serially + @SkipIfFS.hdfs_acls + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_load_data_with_catalog_v1(self, unique_name): + """ + Test that when Ranger is the authorization provider in the legacy catalog mode, + Impala does not throw an AnalysisException when an authorized user tries to execute + a LOAD DATA query against a table partition of which the respective partition path is + not writable according to Impala's FsPermissionChecker. + """ + user = getuser() + admin_client = self.create_impala_client() + unique_database = unique_name + "_db" + unique_table = unique_name + "_tbl" + partition_column = "year" + partition_value = "2008" + destination_table_path = "test-warehouse/{0}.db/{1}" \ + .format(unique_database, unique_table, ) + destination_table_partition_path = "{0}/{1}={2}"\ + .format(destination_table_path, partition_column, partition_value) + file_name = "load_data_with_catalog_v1.txt" + files_for_table = ["testdata/data/{0}".format(file_name)] + source_hdfs_dir = "/tmp" + load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \ + "partition ({4}={5})".format(source_hdfs_dir, file_name, unique_database, + unique_table, partition_column, partition_value) + authz_err = "AuthorizationException: User '{0}' does not have privileges to " \ + "execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table) + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir) + admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int) " + "row format delimited fields terminated by ',' " + "stored as textfile".format(unique_database, unique_table, partition_column), + user=ADMIN) + # We need to add the partition. Otherwise, the LOAD DATA statement can't create new + # partitions. + admin_client.execute("alter table {0}.{1} add partition ({2}={3})" + .format(unique_database, unique_table, partition_column, partition_value), + user=ADMIN) + + # Change the permissions of the HDFS path of the destination table partition. + # Before IMPALA-11871, even we changed the table path to non-writable, loading + # data into the partition was still allowed if the destination partition path + # was writable according to Impala's FsPermissionChecker. But if the destination + # partition path was not writable, an AnalysisException would be thrown. + self.hdfs_client.chown(destination_table_partition_path, "another_user", + "another_group") + # Invalidate the table metadata to force the catalog server to reload the HDFS + # table and the related partition(s). + admin_client.execute("invalidate metadata {0}.{1}" + .format(unique_database, unique_table), user=ADMIN) + + # To execute the LOAD DATA statement, a user has to be granted the ALL privilege + # on the source HDFS path and the INSERT privilege on the destination table. + # The following verifies the LOAD DATA statement fails with AuthorizationException + # due to insufficient privileges. + result = self._run_query_as_user(load_data_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant all on uri '{0}/{1}' to user {2}" + .format(source_hdfs_dir, file_name, user), user=ADMIN) + # The following verifies the ALL privilege on the source file alone is not + # sufficient to execute the LOAD DATA statement. + result = self._run_query_as_user(load_data_statement, user, False) + assert authz_err in str(result) + + admin_client.execute("grant insert on table {0}.{1} to user {2}" + .format(unique_database, unique_table, user), user=ADMIN) + # Verify the LOAD DATA statement fails without AnalysisException. + self._run_query_as_user(load_data_statement, user, True) + finally: + admin_client.execute("revoke all on uri '{0}/{1}' from user {2}" + .format(source_hdfs_dir, file_name, user), user=ADMIN) + admin_client.execute("revoke insert on table {0}.{1} from user {2}" + .format(unique_database, unique_table, user), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + self.filesystem_client.delete_file_dir("{0}/{1}" + .format(source_hdfs_dir, file_name)) + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="{0} {1}".format(IMPALAD_ARGS, "--use_local_catalog=true"),
