This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b9e2b2cd1ea48da676bd09b9c0f30e7d464f84a
Author: Fang-Yu Rao <[email protected]>
AuthorDate: Fri Jan 3 15:43:21 2025 -0800

    IMPALA-13011: Support authorization for Calcite in Impala
    
    This patch adds support for authorization when Calcite is the planner.
    Specifically, this patch focuses on the authorization of table-level
    and column-level privilege requests, including the case when a table
    is a regular view, whether the view was created by a superuser. Note
    that CalciteAnalysisDriver would throw an exception from analysis() if
    given a query that requires table masking, i.e., column masking or row
    filtering, since this feature is not yet supported by the Calcite
    planner.
    
    Moreover, we register the VIEW_METADATA privilege for each function
    involved in the given query. We hardcode the database associated with
    the function to 'BuiltinsDb', which is a bit hacky. We should not be
    doing this once each function could be associated with a database when
    we are using the Calcite planner. We may need to change Calcite's
    parser for this.
    
    The issue reported in IMPALA-13767 will be taken care of in another
    separate patch and hence this patch could incorrectly register the
    privilege request for a common table expression (CTE) in a WITH
    clause, preventing a legitimate user from executing a query involving
    CTE's.
    
    Testing:
     - We manually verified that the patch could pass the test cases in
       AuthorizationStmtTest#testPrivilegeRequests() except for
       "with t as (select * from alltypes) select * from t", for which
       the fix will be provided via IMPALA-13767.
     - Added various tests in test_ranger.py.
    
    Change-Id: I9a7f7e4dc9a86a2da9e387832e552538e34029c1
    Reviewed-on: http://gerrit.cloudera.org:8080/22716
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |   4 +
 .../apache/impala/analysis/ParsedStatement.java    |   6 +
 .../impala/analysis/ParsedStatementImpl.java       |   6 +
 .../apache/impala/analysis/StmtMetadataLoader.java |  20 +-
 .../authorization/BaseAuthorizationChecker.java    |   2 +-
 .../apache/impala/calcite/schema/CalciteDb.java    |   9 +-
 .../impala/calcite/schema/ImpalaViewTable.java     |  46 ++--
 .../calcite/service/CalciteAnalysisDriver.java     | 114 ++++++++-
 .../calcite/service/CalciteParsedStatement.java    |  12 +-
 .../calcite/service/ImpalaSqlValidatorImpl.java    | 127 ++++++++++
 tests/authorization/test_ranger.py                 | 271 +++++++++++++++++++++
 11 files changed, 576 insertions(+), 41 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 6e89cdaae..1fee13f8d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -4183,6 +4183,10 @@ public class Analyzer {
     authErrorMsg_ = errMsg;
   }
 
+  public void unsetMaskPrivChecks() {
+    maskPrivChecks_ = false;
+  }
+
   public void setEnablePrivChecks(boolean value) { enablePrivChecks_ = value; }
   public void setIsStraightJoin() { isStraightJoin_ = true; }
   public boolean isStraightJoin() { return isStraightJoin_; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java 
b/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java
index 6d89d199b..185c77cb2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java
@@ -19,6 +19,8 @@ package org.apache.impala.analysis;
 
 import java.util.Set;
 
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
+
 /**
  * ParsedStatement interface that holds the parsed query statement.
  *
@@ -41,4 +43,8 @@ public interface ParsedStatement {
 
   // returns the sql string (could be rewritten)
   public String toSql();
+
+  // Could be overridden to handle an AuthorizationException thrown for a 
registered
+  // masked privilege request.
+  public void handleAuthorizationException(AnalysisResult analysisResult);
 }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ParsedStatementImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/ParsedStatementImpl.java
index faeb1d40a..d3826ca94 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ParsedStatementImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ParsedStatementImpl.java
@@ -76,4 +76,10 @@ public class ParsedStatementImpl implements ParsedStatement {
   public String toSql() {
     return stmt_.toSql();
   }
+
+  @Override
+  public void handleAuthorizationException(
+      AnalysisContext.AnalysisResult analysisResult) {
+    stmt_.handleAuthorizationException(analysisResult);
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index 8e2751094..01d974aa0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -79,6 +79,10 @@ public class StmtMetadataLoader {
   // Number of catalog topic updates received from the statestore.
   private int numCatalogUpdatesReceived_ = 0;
 
+  // Set to true if the query requires column masking or row filtering.
+  // We should remove this after IMPALA-14295 is resolved.
+  private boolean needsAnyTableMasksInQuery_ = false;
+
   /**
    * Contains all statement-relevant tables and database names as well as the 
latest
    * ImpaladCatalog. An entry in the tables map is guaranteed to point to a 
loaded
@@ -90,12 +94,14 @@ public class StmtMetadataLoader {
     public final FeCatalog catalog;
     public final Set<String> dbs;
     public final Map<TableName, FeTable> tables;
+    public final boolean needsAnyTableMasksInQuery_;
 
     public StmtTableCache(FeCatalog catalog, Set<String> dbs,
-        Map<TableName, FeTable> tables) {
+        Map<TableName, FeTable> tables, boolean needsAnyTableMasksInQuery) {
       this.catalog = Preconditions.checkNotNull(catalog);
       this.dbs = Preconditions.checkNotNull(dbs);
       this.tables = Preconditions.checkNotNull(tables);
+      this.needsAnyTableMasksInQuery_ = needsAnyTableMasksInQuery;
       validate();
     }
 
@@ -182,7 +188,8 @@ public class StmtMetadataLoader {
             loadedOrFailedTbls_.size()));
       }
       
fe_.getImpaladTableUsageTracker().recordTableUsage(loadedOrFailedTbls_.keySet());
-      return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_);
+      return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_,
+          needsAnyTableMasksInQuery_);
     }
 
     if (timeline_ != null) timeline_.markEvent("Metadata load started");
@@ -299,7 +306,8 @@ public class StmtMetadataLoader {
       }
     }
     
fe_.getImpaladTableUsageTracker().recordTableUsage(loadedOrFailedTbls_.keySet());
-    return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_);
+    return new StmtTableCache(catalog, dbs_, loadedOrFailedTbls_,
+        needsAnyTableMasksInQuery_);
   }
 
   /**
@@ -405,11 +413,15 @@ public class StmtMetadataLoader {
         SelectStmt stmt = tableMask.createColumnMaskStmt(
             col.getName(), col.getType(), /*authzCtx*/ null);
         if (stmt == null) continue;
+        needsAnyTableMasksInQuery_ = true;
         tableNames.addAll(collectTableCandidates(stmt));
       }
       // Use authzCtx=null to avoid audits and privilege checks.
       SelectStmt filterStmt = tableMask.createRowFilterStmt(/*authzCtx*/null);
-      if (filterStmt != null) 
tableNames.addAll(collectTableCandidates(filterStmt));
+      if (filterStmt != null) {
+        needsAnyTableMasksInQuery_ = true;
+        tableNames.addAll(collectTableCandidates(filterStmt));
+      }
     }
     return tableNames;
   }
diff --git 
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
 
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
index d43aa76f4..aa8123dcc 100644
--- 
a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
+++ 
b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
@@ -189,7 +189,7 @@ public abstract class BaseAuthorizationChecker implements 
AuthorizationChecker {
         if (!Strings.isNullOrEmpty(maskedReq.second)) {
           throw new AuthorizationException(maskedReq.second);
         } else {
-          
analysisResult.getStmt().handleAuthorizationException(analysisResult);
+          
analysisResult.getParsedStmt().handleAuthorizationException(analysisResult);
         }
         break;
       } finally {
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
index 3b44036ff..f3013701e 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java
@@ -87,12 +87,11 @@ public class CalciteDb extends AbstractSchema {
       RelDataType rowType = CalciteTable.buildColumnsForRelDataType(feTable);
       JavaTypeFactory typeFactory = (JavaTypeFactory) 
ImpalaTypeSystemImpl.TYPE_FACTORY;
       Type elementType = typeFactory.getJavaClass(rowType);
-      return new ViewTable(elementType, RelDataTypeImpl.proto(rowType),
-          ((FeView) feTable).getQueryStmt().toSql(),
+      return new ImpalaViewTable(elementType,
+          RelDataTypeImpl.proto(rowType), ((FeView) 
feTable).getQueryStmt().toSql(),
           /* schemaPath */ ImmutableList.of(),
-          /* viewPath */
-          ImmutableList.of(
-              feTable.getDb().getName().toLowerCase(), 
feTable.getName().toLowerCase()));
+          /* viewPath */ 
ImmutableList.of(feTable.getDb().getName().toLowerCase(),
+          feTable.getName().toLowerCase()), (FeView) feTable);
     }
 
     public CalciteDb build() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaViewTable.java
similarity index 53%
copy from fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java
copy to 
java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaViewTable.java
index 6d89d199b..4b78ec2dc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ParsedStatement.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaViewTable.java
@@ -15,30 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.impala.analysis;
-
-import java.util.Set;
-
-/**
- * ParsedStatement interface that holds the parsed query statement.
- *
- * This is currently implemented by the original Impala parser and the
- * Calcite parser.
- */
-public interface ParsedStatement {
-
-  // Retrieve all the tables/views referenced in the parsed query.
-  public Set<TableName> getTablesInQuery(StmtMetadataLoader loader);
-
-  // return the wrapped statement object.
-  public Object getTopLevelNode();
-
-  // true if this is an explain statement
-  public boolean isExplain();
-
-  // true if this is a query (select) statement
-  public boolean isQueryStmt();
-
-  // returns the sql string (could be rewritten)
-  public String toSql();
+package org.apache.impala.calcite.schema;
+
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.impl.ViewTable;
+import org.apache.impala.catalog.FeView;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+public class ImpalaViewTable extends ViewTable {
+  private final FeView table_;
+  public ImpalaViewTable(Type elementType, RelProtoDataType rowType, String 
viewSql,
+      List<String> schemaPath, List<String> viewPath, FeView feTable) {
+    super(elementType, rowType, viewSql, schemaPath, viewPath);
+    this.table_ = feTable;
+  }
+
+  public FeView getFeView() {
+    return table_;
+  }
 }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java
index 32634cd11..05cb65722 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java
@@ -39,16 +39,25 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ParsedStatement;
 import org.apache.impala.analysis.StmtMetadataLoader;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
+import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationContext;
 import org.apache.impala.authorization.AuthorizationFactory;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.calcite.operators.ImpalaOperatorTable;
 import org.apache.impala.calcite.schema.ImpalaCalciteCatalogReader;
 import org.apache.impala.calcite.type.ImpalaTypeCoercionFactory;
 import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
 import org.apache.impala.calcite.util.SimplifiedAnalyzer;
 import org.apache.impala.calcite.validate.ImpalaConformance;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeView;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ParseException;
+import org.apache.impala.common.UnsupportedFeatureException;
 import org.apache.impala.planner.PlannerContext;
 import org.apache.impala.planner.SingleNodePlannerIntf;
 import org.apache.impala.thrift.TQueryCtx;
@@ -70,7 +79,7 @@ public class CalciteAnalysisDriver implements AnalysisDriver {
 
   public RelDataTypeFactory typeFactory_;
 
-  public SqlValidator sqlValidator_;
+  public ImpalaSqlValidatorImpl sqlValidator_;
 
   // CalciteCatalogReader is a context class that holds global information that
   // may be needed by the CalciteTable object
@@ -99,6 +108,11 @@ public class CalciteAnalysisDriver implements 
AnalysisDriver {
   @Override
   public AnalysisResult analyze() {
     try {
+      if (stmtTableCache_.needsAnyTableMasksInQuery_) {
+        throw new UnsupportedFeatureException("Column masking and row 
filtering are " +
+            "not yet supported by the Calcite planner.");
+      }
+
       reader_ = 
CalciteMetadataHandler.createCalciteCatalogReader(stmtTableCache_,
           queryCtx_, queryCtx_.session.database);
       // When CalciteRelNodeConverter#convert() is called to convert the valid 
AST into a
@@ -112,7 +126,7 @@ public class CalciteAnalysisDriver implements 
AnalysisDriver {
           stmtTableCache_, analyzer_);
 
       typeFactory_ = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl());
-      sqlValidator_ = SqlValidatorUtil.newValidator(
+      sqlValidator_ = new ImpalaSqlValidatorImpl(
           ImpalaOperatorTable.getInstance(),
           reader_, typeFactory_,
           SqlValidator.Config.DEFAULT
@@ -121,8 +135,24 @@ public class CalciteAnalysisDriver implements 
AnalysisDriver {
               .withIdentifierExpansion(true)
               .withConformance(ImpalaConformance.INSTANCE)
               .withTypeCoercionEnabled(true)
-              .withTypeCoercionFactory(new ImpalaTypeCoercionFactory())
-              );
+              .withTypeCoercionFactory(new ImpalaTypeCoercionFactory()),
+          analyzer_);
+
+      // Register the privilege requests for the tables directly referenced in 
the given
+      // query as well as those for the tables and columns referenced by the 
views.
+      // We do not pass stmtTableCache_.tables.keySet() to 
registerPrivReqsInTables()
+      // because given a table from stmtTableCache_.tables.keySet(), we do not 
know
+      // whether the table is referenced directly in the query or is 
referenced by a view
+      // in the query.
+      registerPrivReqsInTables(parsedStmt_.getTablesInQuery(/* loader */ null),
+          /* shouldMaskPrivChecks */ false, stmtTableCache_.catalog, 
sqlValidator_);
+
+      // sqlValidator_.validate() would also register privilege requests for 
the columns
+      // directly referenced in the query. We do this after the above call 
because in the
+      // case when some columns directly referenced in the query could not be 
resolved,
+      // we still need to register the privilege requests for tables and 
columns
+      // referenced by views assuming that those tables and columns referenced 
by views
+      // could be successfully resolved.
       validatedNode_ = sqlValidator_.validate(parsedStmt_.getParsedSqlNode());
       return new CalciteAnalysisResult(this);
     } catch (ImpalaException e) {
@@ -162,4 +192,80 @@ public class CalciteAnalysisDriver implements 
AnalysisDriver {
   public ParsedStatement getParsedStmt() {
     return parsedStmt_;
   }
+
+  /**
+   * This method registers the privilege requests for the tables referenced in 
the given
+   * query as well as the tables and columns referenced by the views in the 
given query.
+   */
+  private void registerPrivReqsInTables(Set<TableName> tableNamesInQuery,
+      boolean shouldMaskPrivChecks, FeCatalog catalog, ImpalaSqlValidatorImpl 
validator)
+      throws ParseException {
+
+    for (TableName tableName : tableNamesInQuery) {
+      FeTable feTable = registerTablePrivReq(tableName, catalog);
+
+      if (feTable instanceof FeView) {
+        String sql = ((FeView) feTable).getQueryStmt().toSql();
+        CalciteQueryParser queryParser = new CalciteQueryParser(sql);
+        SqlNode parsedSqlNode = queryParser.parse();
+        CalciteMetadataHandler.TableVisitor tableVisitor =
+            new CalciteMetadataHandler.TableVisitor(/* currentDb */ "default");
+        parsedSqlNode.accept(tableVisitor);
+
+        boolean childViewCreatedBySuperuser =
+            !PrivilegeRequestBuilder.isViewCreatedByNonSuperuser(feTable);
+        // Recall that 'shouldMaskPrivChecks' denotes whether we should mask 
the
+        // privilege requests during privilege request registration for the 
given
+        // TableName's. We should mask the privilege requests for a child view 
if
+        // 'shouldMaskPrivChecks' is true, or the child view was created by a 
superuser.
+        // This matches what the classic Impala frontend does. Refer to
+        // InlineViewRef#analyze() for what the classic Impala frontend does 
to analyze
+        // a regular view (or a view whose isCatalogView() evaluates to true) 
and to
+        // IMPALA-10122 (Part 2) for further details.
+        if (shouldMaskPrivChecks || childViewCreatedBySuperuser) {
+          // This sets 'maskPrivChecks_' of 'analyzer_' to true so that during 
the
+          // following calls to validator.validate() and 
registerTablePrivReq() in the
+          // recursive call, the privilege requests would be registered as 
masked ones,
+          // which allows the requesting user to SELECT the requested columns 
and tables
+          // even though the requesting user does not have the SELECT 
privilege on them.
+          analyzer_.setMaskPrivChecks(null);
+        }
+        // Register privilege requests for columns referenced by the child 
view.
+        validator.validate(parsedSqlNode);
+
+        // Recurse if 'feTable' is also a view. Note that the privilege 
requests for the
+        // tables referenced by 'feTable' will be registered within the 
recursive call.
+        registerPrivReqsInTables(tableVisitor.tableNames_,
+            shouldMaskPrivChecks || childViewCreatedBySuperuser, catalog, 
validator);
+
+        // Set 'maskPrivChecks_' back to false in this case because we do not 
know if
+        // other child views were created by a superuser too.
+        if (!shouldMaskPrivChecks && childViewCreatedBySuperuser) {
+          analyzer_.unsetMaskPrivChecks();
+        }
+      }
+    }
+  }
+
+  private FeTable registerTablePrivReq(TableName tableName, FeCatalog catalog) 
{
+    FeTable feTable = getFeTable(tableName, catalog);
+    if (feTable == null) {
+      analyzer_.registerPrivReq(builder -> {
+        builder.onTableUnknownOwner(tableName.getDb(), tableName.getTbl())
+            .allOf(Privilege.SELECT);
+        return builder.build();
+      });
+    } else {
+      analyzer_.registerAuthAndAuditEvent(feTable, Privilege.SELECT);
+    }
+    return feTable;
+  }
+
+  private FeTable getFeTable(TableName tableName, FeCatalog catalog) {
+    FeDb db = catalog.getDb(tableName.getDb());
+    if (db == null) {
+      return null;
+    }
+    return db.getTable(tableName.getTbl());
+  }
 }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java
index 16bd759e7..bd2e75894 100644
--- 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java
@@ -26,6 +26,7 @@ import org.apache.impala.analysis.AnalysisDriver;
 import org.apache.impala.analysis.ParsedStatement;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.analysis.StmtMetadataLoader;
+import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TQueryCtx;
 
@@ -82,7 +83,16 @@ public class CalciteParsedStatement implements 
ParsedStatement {
     return sql_;
   }
 
+  @Override
+  public void handleAuthorizationException(
+      AnalysisContext.AnalysisResult analysisResult) {
+    // Do nothing.
+  }
+
   public SqlNode getParsedSqlNode() {
-    return parsedNode_;
+    // Return a clone of 'parsedNode_' to maintain the immutability of 
'parsedNode_'.
+    // This prevents other instances, e.g.,
+    // 'sqlValidator_' in CalciteAnalysisDriver#analyze() from modifying 
'parsedNode_'.
+    return parsedNode_.clone(parsedNode_);
   }
 }
diff --git 
a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ImpalaSqlValidatorImpl.java
 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ImpalaSqlValidatorImpl.java
new file mode 100644
index 000000000..43e908979
--- /dev/null
+++ 
b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ImpalaSqlValidatorImpl.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.calcite.service;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlQualified;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorScope.Resolve;
+import org.apache.calcite.sql.validate.SqlValidatorScope.ResolvedImpl;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.calcite.schema.CalciteTable;
+import org.apache.impala.calcite.schema.ImpalaViewTable;
+import org.apache.impala.catalog.BuiltinsDb;
+import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.FeFsTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ImpalaSqlValidatorImpl is responsible for registering column-level and
+ * function-level privilege requests in the given query. The methods in the 
class will be
+ * invoked via SqlValidatorImpl#validate(SqlNode topNode).
+ */
+public class ImpalaSqlValidatorImpl extends SqlValidatorImpl {
+
+  private Analyzer analyzer_;
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(ImpalaSqlValidatorImpl.class.getName());
+
+  public ImpalaSqlValidatorImpl(SqlOperatorTable opTab,
+      SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory,
+      SqlValidator.Config config, Analyzer analyzer) {
+    super(opTab, catalogReader, typeFactory, config);
+    analyzer_ = analyzer;
+  }
+
+  @Override public void validateIdentifier(SqlIdentifier id, SqlValidatorScope 
scope) {
+    super.validateIdentifier(id, scope);
+    // It is a bit inefficient to execute scope.fullyQualify() again because 
this has
+    // been done within super.validateIdentifier(id, scope). But since we need 
'fqId'
+    // to register the privilege request, we have to do this for now. We 
created
+    // CALCITE-7152 to keep track of this.
+    SqlQualified fqId = scope.fullyQualify(id);
+
+    SqlIdentifier prefix = id.getComponent(0, 1);
+    SqlNameMatcher nameMatcher = this.getCatalogReader().nameMatcher();
+    ResolvedImpl resolved = new ResolvedImpl();
+    SqlValidatorNamespace fromNs;
+    scope.resolve(prefix.names, nameMatcher, false, resolved);
+    if (resolved.count() == 1) {
+      Resolve resolve = resolved.only();
+      fromNs = resolve.namespace;
+      SqlValidatorTable validatorTable = fromNs.getTable();
+
+      if (validatorTable instanceof CalciteTable) {
+        FeFsTable feFsTable = ((CalciteTable) validatorTable).getFeFsTable();
+        this.analyzer_.registerPrivReq(
+            builder -> builder.allOf(Privilege.SELECT)
+                .onColumn(feFsTable.getDb().getName(),
+                    feFsTable.getTableName().getTbl(), 
fqId.identifier.names.get(1),
+                    feFsTable.getOwnerUser()).build()
+        );
+        return;
+      }
+
+      // A view would be an instance of RelOptTableImpl if the view was 
created via
+      // CalciteDb.Builder#createViewTable().
+      if (validatorTable instanceof RelOptTableImpl) {
+        Preconditions.checkState(validatorTable.table() instanceof 
ImpalaViewTable);
+        FeView view = ((ImpalaViewTable) validatorTable.table()).getFeView();
+        this.analyzer_.registerPrivReq(
+            builder -> builder.allOf(Privilege.SELECT)
+                .onColumn(view.getDb().getName(),
+                    view.getTableName().getTbl(), fqId.identifier.names.get(1),
+                    view.getOwnerUser()).build()
+        );
+      }
+    }
+  }
+
+  @Override public void validateCall(
+      SqlCall call,
+      SqlValidatorScope scope) {
+    super.validateCall(call, scope);
+
+    SqlOperator operator = call.getOperator();
+    // It's a bit hacky to assume each SqlFunction is associated with 
BuiltinsDb. Ideally
+    // a function could be associated with any database. IMPALA-13095 was 
created to keep
+    // track of this.
+    if (operator instanceof SqlFunction) {
+      this.analyzer_.registerPrivReq(
+          builder -> builder.allOf(Privilege.VIEW_METADATA)
+              .onDb(BuiltinsDb.getInstance().getName(), null).build());
+    }
+  }
+}
diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index 17ce48d4c..aef153fd1 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1420,6 +1420,257 @@ class TestRanger(CustomClusterTestSuite):
           .format(unique_database))
       TestRanger._remove_policy(unique_name)
 
+  def _test_select_calcite_frontend(self, unique_name):
+    grantee_user = "non_owner"
+    with self.create_impala_client(user=ADMIN) as admin_client, \
+      self.create_impala_client(user=grantee_user) as non_owner_client:
+      # Set the query option of 'use_calcite_planner' to 1 to use the Calcite 
planner.
+      non_owner_client.set_configuration({"use_calcite_planner": 1})
+      unique_database = unique_name + "_db"
+      unique_table = unique_name + "_tbl"
+      test_select_query = "select * from {0}.{1}".format(unique_database, 
unique_table)
+      error_msg_prefix = "User '{0}' does not have privileges to execute 
'SELECT' on:" \
+          .format(grantee_user)
+      try:
+        # Set up a temporary database and a table.
+        admin_client.execute("drop database if exists {0} cascade"
+            .format(unique_database))
+        admin_client.execute("create database {0}".format(unique_database))
+        admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)"
+            .format(unique_database, unique_table))
+        admin_client.execute("grant select on table functional.alltypes to 
user {0}"
+            .format(grantee_user))
+        admin_client.execute("refresh authorization")
+
+        # Even though the user 'grantee_user' was granted the SELECT privilege 
on the
+        # table 'functional.alltypes', the user still could not execute the 
query due to
+        # IMPALA-13767.
+        result = self.execute_query_expect_failure(non_owner_client,
+            "with t as (select * from functional.alltypes) select * from t")
+        assert "{0} default.t".format(error_msg_prefix) in str(result)
+
+        admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
+            .format(unique_database, unique_table, grantee_user))
+        admin_client.execute("refresh authorization")
+
+        # Even though 'grantee_user' was already granted the SELECT privilege 
on the
+        # column 'id', 'grantee_user' still could not execute the query 
because the user
+        # does not have the SELECT privilege on the table, or the SELECT 
privilege on the
+        # column 'bigint_col'.
+        result = self.execute_query_expect_failure(non_owner_client, 
test_select_query)
+        assert "{0} {1}.{2}" \
+            .format(error_msg_prefix, unique_database, unique_table) in 
str(result)
+
+        admin_client.execute("grant select (bigint_col) on table {0}.{1} to 
user {2}"
+            .format(unique_database, unique_table, grantee_user))
+        admin_client.execute("refresh authorization")
+
+        # After 'grantee_user' is granted the SELECT privilege on the column
+        # 'bigint_col', the query could be executed.
+        non_owner_client.execute(test_select_query)
+      finally:
+        admin_client.execute("revoke select on table functional.alltypes from 
user {0}"
+            .format(grantee_user))
+        admin_client.execute("revoke select (id) on table {0}.{1} from user 
{2}"
+            .format(unique_database, unique_table, grantee_user))
+        admin_client.execute("revoke select (bigint_col) on table {0}.{1} from 
user {2}"
+            .format(unique_database, unique_table, grantee_user))
+        admin_client.execute("refresh authorization")
+        admin_client.execute("drop database if exists {0} cascade"
+            .format(unique_database))
+
+  def _test_view_on_view(self, use_calcite_planner,
+      v2_created_by_non_superuser, v1_created_by_non_superuser,
+      v2_name, v1_name,
+      # This denotes the columns on which the requesting user has to have the 
SELECT
+      # privilege in order to execute the SELECT query against the view v2.
+      priv_req_columns,
+      # This denotes the tables for which a frontend registers the masked 
privilege
+      # requests.
+      priv_req_masked_tables,
+      # This denotes the columns for which a frontend registers the masked 
privilege
+      # requests.
+      priv_req_masked_columns):
+    grantee_user = "non_owner"
+    with self.create_impala_client(user=ADMIN) as admin_client, \
+        self.create_impala_client(user=grantee_user) as non_owner_client:
+      if use_calcite_planner is True:
+        # Set the query option of 'use_calcite_planner' to 1 to use the 
Calcite planner.
+        non_owner_client.set_configuration({"use_calcite_planner": 1})
+      test_select_query = "select * from {0}".format(v2_name)
+      select_error_prefix = "User '{0}' does not have privileges to execute " \
+          "'SELECT' on:".format(grantee_user)
+      profile_error = "User {0} is not authorized to access the runtime 
profile " \
+          "or execution summary.".format(grantee_user)
+      try:
+        # Add the table property to simulate views created by a non-superuser.
+        if v2_created_by_non_superuser is True:
+          self.run_stmt_in_hive("alter view {0} set tblproperties "
+              "('Authorized' = 'false')".format(v2_name))
+          admin_client.execute("invalidate metadata {0}".format(v2_name))
+        if v1_created_by_non_superuser is True:
+          self.run_stmt_in_hive("alter view {0} set tblproperties "
+              "('Authorized' = 'false')".format(v1_name))
+          admin_client.execute("invalidate metadata {0}".format(v1_name))
+
+        result = self.execute_query_expect_failure(non_owner_client, 
test_select_query)
+        assert "{0} {1}" \
+            .format(select_error_prefix, v2_name) in str(result)
+
+        for column in priv_req_columns:
+          admin_client.execute("grant select ({0}) on table {1} to user {2}"
+              .format(column[1], column[0], grantee_user))
+        admin_client.execute("refresh authorization")
+        # Recall that by default the Impala client would always attempt to 
retrieve the
+        # runtime profile after query execution. The following verifies except 
for the
+        # case in which both views were created by a non-superuser, 
'grantee_user' will
+        # not be able to retrieve the runtime profile even though the SELECT 
query could
+        # be executed. Note that no masked privilege request would be 
registered when
+        # both views were created by a non-superuser.
+        if v2_created_by_non_superuser is False or v1_created_by_non_superuser 
is False:
+          result = self.execute_query_expect_failure(non_owner_client, 
test_select_query)
+          assert profile_error in str(result)
+        else:
+          non_owner_client.execute(test_select_query)
+
+        # Once we grant to 'grantee_user' the privileges on the tables and 
columns for
+        # which the masked privilege requests were registered, query could be 
executed
+        # and the runtime profile could be retrieved.
+        for table in priv_req_masked_tables:
+          admin_client.execute("grant select on table {0} to user {1}"
+              .format(table, grantee_user))
+        for column in priv_req_masked_columns:
+          admin_client.execute("grant select ({0}) on table {1} to user {2}"
+              .format(column[1], column[0], grantee_user))
+        admin_client.execute("refresh authorization")
+        non_owner_client.execute(test_select_query)
+      finally:
+        for column in priv_req_columns:
+          admin_client.execute("revoke select ({0}) on table {1} from user {2}"
+              .format(column[1], column[0], grantee_user))
+        if v2_created_by_non_superuser is True:
+          self.run_stmt_in_hive("alter view {0} unset tblproperties 
('Authorized')"
+              .format(v2_name))
+          admin_client.execute("invalidate metadata {0}".format(v2_name))
+        if v1_created_by_non_superuser is True:
+          self.run_stmt_in_hive("alter view {0} unset tblproperties 
('Authorized')"
+              .format(v1_name))
+          admin_client.execute("invalidate metadata {0}".format(v1_name))
+        for table in priv_req_masked_tables:
+          admin_client.execute("revoke select on table {0} from user {1}"
+              .format(table, grantee_user))
+        for column in priv_req_masked_columns:
+          admin_client.execute("revoke select ({0}) on table {1} from user {2}"
+              .format(column[1], column[0], grantee_user))
+        admin_client.execute("refresh authorization")
+
+  def _test_view_on_view_all_configs(self, unique_name):
+    """
+    This verifies 4 possible combinations of the table property of 
'Authorized' when a
+    view (v2) is defined on top of another view (v1) for both the classic 
planner and the
+    Calcite planner.
+    The expected behavior could be summarized as follows.
+    1. In order to execute the SELECT query against v2, the requesting user 
always has
+       to have the SELECT privilege on v2, or the SELECT privilege on the 
selected column
+       in v2.
+    2. For a view v that was created by a superuser, i.e., v does not have the 
table
+       property of 'Authorized' set to "false", privilege requests for the 
underlying
+       tables, views, and the columns of v would be registered as the masked 
ones,
+       meaning that the requesting user does not have to be granted the 
privileges on
+       those referenced resources by v to execute a query against v.
+    3. However, if there is any registered masked privilege request for any 
table or
+       column on which the requesting user does not have the privilege, the 
requesting
+       user will not be able to retrieve the runtime profile.
+    """
+    admin_client = self.create_impala_client(user=ADMIN)
+    unique_database = unique_name + "_db"
+    unique_table = unique_database + "." + unique_name + "_tbl"
+    unique_view_1 = unique_database + "." + unique_name + "_v1"
+    unique_view_2 = unique_database + "." + unique_name + "_v2"
+    planner_options = [True, False]
+    try:
+      # Set up a temporary database, a table, and 2 views.
+      admin_client.execute("drop database if exists {0} cascade"
+          .format(unique_database))
+      admin_client.execute("create database {0}".format(unique_database))
+      admin_client.execute("create table {0} (id int)".format(unique_table))
+      admin_client.execute("create view {0} as select * from {1}"
+          .format(unique_view_1, unique_table))
+      admin_client.execute("create view {0} as select * from {1}"
+          .format(unique_view_2, unique_view_1))
+
+      for use_calcite_planner in planner_options:
+        self._test_view_on_view(use_calcite_planner, False, False,
+            unique_view_2, unique_view_1,
+            [(unique_view_2, "id")],
+            [unique_view_1, unique_table],
+            [(unique_view_1, "id"), (unique_table, "id")])
+
+        self._test_view_on_view(use_calcite_planner, False, True,
+            unique_view_2, unique_view_1,
+            [(unique_view_2, "id")],
+            [unique_view_1, unique_table],
+            [(unique_view_1, "id"), (unique_table, "id")])
+
+        self._test_view_on_view(use_calcite_planner, True, False,
+            unique_view_2, unique_view_1,
+            [(unique_view_2, "id"), (unique_view_1, "id")],
+            [unique_table],
+            [(unique_table, "id")])
+
+        self._test_view_on_view(use_calcite_planner, True, True,
+            unique_view_2, unique_view_1,
+            [(unique_view_2, "id"), (unique_view_1, "id"), (unique_table, 
"id")],
+            [],
+            [])
+
+    finally:
+      admin_client.execute("drop database if exists {0} 
cascade".format(unique_database))
+
+  def _test_table_masking_calcite_frontend(self, unique_name):
+    """
+    This verifies table masking is not yet supported by the Calcite planner.
+    """
+    grantee_user = "non_owner"
+    with self.create_impala_client(user=ADMIN) as admin_client, \
+        self.create_impala_client(user=grantee_user) as non_owner_client:
+      non_owner_client.set_configuration({"use_calcite_planner": 1})
+      database = "functional"
+      table_1 = "alltypes"
+      table_2 = "alltypestiny"
+      test_select_query_1 = "select id from {0}.{1}".format(database, table_1)
+      test_select_query_2 = "select id from {0}.{1}".format(database, table_2)
+      select_error = "UnsupportedFeatureException: Column masking and row 
filtering " \
+          "are not yet supported by the Calcite planner."
+
+      policy_cnt = 0
+      try:
+        TestRanger._add_column_masking_policy(
+          unique_name + str(policy_cnt), grantee_user, database, table_1, "id",
+          "CUSTOM", "id * 100")
+        policy_cnt += 1
+
+        admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
+            .format(database, table_1, grantee_user))
+        result = self.execute_query_expect_failure(non_owner_client, 
test_select_query_1)
+        assert select_error in str(result)
+
+        TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), grantee_user, database, table_2, "id 
% 2 = 0")
+        policy_cnt += 1
+
+        admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
+            .format(database, table_1, grantee_user))
+        result = self.execute_query_expect_failure(non_owner_client, 
test_select_query_2)
+        assert select_error in str(result)
+      finally:
+        admin_client.execute("revoke select (id) on table {0}.{1} from user 
{2}"
+            .format(database, table_1, grantee_user))
+        admin_client.execute("revoke select (id) on table {0}.{1} from user 
{2}"
+            .format(database, table_2, grantee_user))
+        for i in range(policy_cnt):
+          TestRanger._remove_policy(unique_name + str(i))
+
 
 class TestRangerIndependent(TestRanger):
   """
@@ -3361,3 +3612,23 @@ class TestRangerIcebergRestCatalog(TestRanger):
       finally:
         self._remove_policy("column-masking-for-airports_parquet")
         self._remove_policy("row-filtering-for-airports_parquet")
+
+
[email protected]_args(
+    start_args="--env_vars=USE_CALCITE_PLANNER=true",
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args=CATALOGD_ARGS)
+class TestRangerWithCalcite(TestRanger):
+  """
+  Tests for verifying the behavior of the Calcite planner with respect to
+  authorization via the Ranger server.
+  """
+
+  def test_select_calcite_frontend(self, unique_name):
+    self._test_select_calcite_frontend(unique_name)
+
+  def test_view_on_view_all_configs(self, unique_name):
+    self._test_view_on_view_all_configs(unique_name)
+
+  def test_table_masking_calcite_frontend(self, unique_name):
+    self._test_table_masking_calcite_frontend(unique_name)


Reply via email to