yuqi1129 commented on code in PR #4515:
URL: https://github.com/apache/gravitino/pull/4515#discussion_r1721304932


##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java:
##########
@@ -0,0 +1,1023 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.exceptions.AuthorizationPluginException;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerRole;
+import org.apache.ranger.plugin.util.GrantRevokeRoleRequest;
+import org.apache.ranger.plugin.util.SearchFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Ranger authorization operations plugin abstract class. */
+public abstract class RangerAuthorizationPlugin implements AuthorizationPlugin 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuthorizationPlugin.class);
+
+  protected String catalogProvider;
+  protected RangerClientExt rangerClient;
+  protected String rangerServiceName;
+  /** Mapping Gravitino privilege name to the underlying authorization system 
privileges. */
+  protected Map<Privilege.Name, Set<String>> mapPrivileges = null;
+  // The owner privileges, the owner can do anything on the metadata object
+  protected Set<String> ownerPrivileges = null;
+
+  /**
+   * Because Ranger doesn't support the precise filter, Ranger will return the 
policy meets the
+   * wildcard(*,?) conditions, just like `*.*.*` policy will match 
`db1.table1.column1` So we need
+   * to manual precise filter the policies.
+   */
+  // Search Ranger policy filter keys
+  protected List<String> policyFilterKeys = null;
+  // Search Ranger policy precise filter keys
+  protected List<String> policyPreciseFilterKeys = null;
+
+  public static final String MANAGED_BY_GRAVITINO = "MANAGED_BY_GRAVITINO";
+
+  // TODO: Maybe need to move to the configuration in the future
+  public static final String RANGER_ADMIN_NAME = "admin";
+
+  public RangerAuthorizationPlugin(String catalogProvider, Map<String, String> 
config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+
+    initMapPrivileges();
+    initOwnerPrivileges();
+    initPolicyFilterKeys();
+    initPreciseFilterKeys();
+  }
+
+  /**
+   * Different underlying permission system may have different privilege 
names, this function is
+   * used to initialize the privilege mapping.
+   */
+  protected abstract void initMapPrivileges();
+
+  /**
+   * Different underlying permission system may have different owner privilege 
names, this function
+   * is used to initialize the owner privilege mapping.
+   */
+  protected abstract void initOwnerPrivileges();
+
+  // Initial Ranger policy filter keys
+  protected abstract void initPolicyFilterKeys();
+  // Initial Ranger policy precise filter keys
+  protected abstract void initPreciseFilterKeys();
+
+  /**
+   * Translate the privilege name to the corresponding privilege name in the 
underlying permission
+   *
+   * @param name The privilege name to translate
+   * @return The corresponding privilege name in the underlying permission 
system
+   */
+  public Set<String> translatePrivilege(Privilege.Name name) {
+    return mapPrivileges.get(name);
+  }
+
+  /**
+   * Whether this privilege is underlying permission system supported
+   *
+   * @param name The privilege name to check
+   * @return true if the privilege is supported, otherwise false
+   */
+  protected boolean checkPrivilege(Privilege.Name name) {
+    return mapPrivileges.containsKey(name);
+  }
+
+  @FormatMethod
+  protected void check(boolean condition, @FormatString String message, 
Object... args) {
+    if (!condition) {
+      throw new AuthorizationPluginException(message, args);
+    }
+  }
+
+  @VisibleForTesting
+  public List<String> getOwnerPrivileges() {
+    return Lists.newArrayList(ownerPrivileges);
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    createRangerRoleIfNotExists(role.name());
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> RoleChange.addSecurableObject(role.name(), 
securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    try {
+      return role.securableObjects().stream()
+              .filter(
+                  securableObject -> {
+                    RangerPolicy policy = findManagedPolicy(securableObject);
+                    return policy != null;
+                  })
+              .count()
+          == role.securableObjects().size();

Review Comment:
   ```java
   return role.securableObjects().stream().allMatch(object -> 
findManagedPolicy(object) != null);
   
   ```



##########
api/src/main/java/org/apache/gravitino/authorization/RoleChange.java:
##########
@@ -149,7 +189,101 @@ public int hashCode() {
      */
     @Override
     public String toString() {
-      return "REMOVESECURABLEOBJECT " + securableObject;
+      return "REMOVESECURABLEOBJECT " + roleName + " " + securableObject;
+    }
+  }
+
+  /**
+   * A UpdateSecurableObject is to update securable object's privilege from 
role. <br>
+   * The securable object's metadata entity must be the same as new securable 
object's metadata
+   * entity. <br>
+   * The securable object's privilege must be different as new securable 
object's privilege. <br>
+   */
+  final class UpdateSecurableObject implements RoleChange {
+    private final String roleName;
+    private final SecurableObject securableObject;
+    private final SecurableObject newSecurableObject;
+
+    private UpdateSecurableObject(
+        String roleName, SecurableObject securableObject, SecurableObject 
newSecurableObject) {
+      if (!securableObject.fullName().equals(newSecurableObject.fullName())) {
+        throw new IllegalArgumentException(
+            "The securable object's metadata entity must be same as new 
securable object's metadata entity.");
+      }
+      if 
(securableObject.privileges().containsAll(newSecurableObject.privileges())) {
+        throw new IllegalArgumentException(
+            "The updated securable object's privilege must be different as new 
securable object's privilege.");

Review Comment:
   This place has not been resolved.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationPlugin.java:
##########
@@ -0,0 +1,1023 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.authorization.ranger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.authorization.Group;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.Privilege;
+import org.apache.gravitino.authorization.Role;
+import org.apache.gravitino.authorization.RoleChange;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
+import org.apache.gravitino.authorization.User;
+import org.apache.gravitino.authorization.ranger.defines.VXGroup;
+import org.apache.gravitino.authorization.ranger.defines.VXGroupList;
+import org.apache.gravitino.authorization.ranger.defines.VXUser;
+import org.apache.gravitino.authorization.ranger.defines.VXUserList;
+import org.apache.gravitino.connector.AuthorizationPropertiesMeta;
+import org.apache.gravitino.connector.authorization.AuthorizationPlugin;
+import org.apache.gravitino.exceptions.AuthorizationPluginException;
+import org.apache.ranger.RangerServiceException;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerRole;
+import org.apache.ranger.plugin.util.GrantRevokeRoleRequest;
+import org.apache.ranger.plugin.util.SearchFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Ranger authorization operations plugin abstract class. */
+public abstract class RangerAuthorizationPlugin implements AuthorizationPlugin 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(RangerAuthorizationPlugin.class);
+
+  protected String catalogProvider;
+  protected RangerClientExt rangerClient;
+  protected String rangerServiceName;
+  /** Mapping Gravitino privilege name to the underlying authorization system 
privileges. */
+  protected Map<Privilege.Name, Set<String>> mapPrivileges = null;
+  // The owner privileges, the owner can do anything on the metadata object
+  protected Set<String> ownerPrivileges = null;
+
+  /**
+   * Because Ranger doesn't support the precise filter, Ranger will return the 
policy meets the
+   * wildcard(*,?) conditions, just like `*.*.*` policy will match 
`db1.table1.column1` So we need
+   * to manual precise filter the policies.
+   */
+  // Search Ranger policy filter keys
+  protected List<String> policyFilterKeys = null;
+  // Search Ranger policy precise filter keys
+  protected List<String> policyPreciseFilterKeys = null;
+
+  public static final String MANAGED_BY_GRAVITINO = "MANAGED_BY_GRAVITINO";
+
+  // TODO: Maybe need to move to the configuration in the future
+  public static final String RANGER_ADMIN_NAME = "admin";
+
+  public RangerAuthorizationPlugin(String catalogProvider, Map<String, String> 
config) {
+    super();
+    this.catalogProvider = catalogProvider;
+    String rangerUrl = 
config.get(AuthorizationPropertiesMeta.RANGER_ADMIN_URL);
+    String authType = config.get(AuthorizationPropertiesMeta.RANGER_AUTH_TYPE);
+    String username = config.get(AuthorizationPropertiesMeta.RANGER_USERNAME);
+    // Apache Ranger Password should be minimum 8 characters with min one 
alphabet and one numeric.
+    String password = config.get(AuthorizationPropertiesMeta.RANGER_PASSWORD);
+    rangerServiceName = 
config.get(AuthorizationPropertiesMeta.RANGER_SERVICE_NAME);
+    check(rangerUrl != null, "Ranger admin URL is required");
+    check(authType != null, "Ranger auth type is required");
+    check(username != null, "Ranger username is required");
+    check(password != null, "Ranger password is required");
+    check(rangerServiceName != null, "Ranger service name is required");
+
+    rangerClient = new RangerClientExt(rangerUrl, authType, username, 
password);
+
+    initMapPrivileges();
+    initOwnerPrivileges();
+    initPolicyFilterKeys();
+    initPreciseFilterKeys();
+  }
+
+  /**
+   * Different underlying permission system may have different privilege 
names, this function is
+   * used to initialize the privilege mapping.
+   */
+  protected abstract void initMapPrivileges();
+
+  /**
+   * Different underlying permission system may have different owner privilege 
names, this function
+   * is used to initialize the owner privilege mapping.
+   */
+  protected abstract void initOwnerPrivileges();
+
+  // Initial Ranger policy filter keys
+  protected abstract void initPolicyFilterKeys();
+  // Initial Ranger policy precise filter keys
+  protected abstract void initPreciseFilterKeys();
+
+  /**
+   * Translate the privilege name to the corresponding privilege name in the 
underlying permission
+   *
+   * @param name The privilege name to translate
+   * @return The corresponding privilege name in the underlying permission 
system
+   */
+  public Set<String> translatePrivilege(Privilege.Name name) {
+    return mapPrivileges.get(name);
+  }
+
+  /**
+   * Whether this privilege is underlying permission system supported
+   *
+   * @param name The privilege name to check
+   * @return true if the privilege is supported, otherwise false
+   */
+  protected boolean checkPrivilege(Privilege.Name name) {
+    return mapPrivileges.containsKey(name);
+  }
+
+  @FormatMethod
+  protected void check(boolean condition, @FormatString String message, 
Object... args) {
+    if (!condition) {
+      throw new AuthorizationPluginException(message, args);
+    }
+  }
+
+  @VisibleForTesting
+  public List<String> getOwnerPrivileges() {
+    return Lists.newArrayList(ownerPrivileges);
+  }
+
+  /**
+   * Because Ranger does not have Role concept, Each metadata object will have 
a unique Ranger
+   * policy. we can use one or more Ranger policy to simulate the role. <br>
+   * 1. Create a policy for each metadata object. <br>
+   * 2. Save role name in the Policy properties. <br>
+   * 3. Set `MANAGED_BY_GRAVITINO` label in the policy. <br>
+   * 4. For easy manage, each privilege will create a RangerPolicyItemAccess 
in the policy. <br>
+   * 5. The policy will only have one user, the user is the {OWNER} of the 
policy. <br>
+   * 6. The policy will not have group. <br>
+   */
+  @Override
+  public Boolean onRoleCreated(Role role) throws RuntimeException {
+    createRangerRoleIfNotExists(role.name());
+    return onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> RoleChange.addSecurableObject(role.name(), 
securableObject))
+            .toArray(RoleChange[]::new));
+  }
+
+  @Override
+  public Boolean onRoleAcquired(Role role) throws RuntimeException {
+    try {
+      return role.securableObjects().stream()
+              .filter(
+                  securableObject -> {
+                    RangerPolicy policy = findManagedPolicy(securableObject);
+                    return policy != null;
+                  })
+              .count()
+          == role.securableObjects().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple securable objects, so we 
didn't directly
+   * remove the policy. <br>
+   */
+  @Override
+  public Boolean onRoleDeleted(Role role) throws RuntimeException {
+    // First, remove the role in the Ranger policy
+    onRoleUpdated(
+        role,
+        role.securableObjects().stream()
+            .map(securableObject -> 
RoleChange.removeSecurableObject(role.name(), securableObject))
+            .toArray(RoleChange[]::new));
+    // Lastly, remove the role in the Ranger
+    try {
+      rangerClient.deleteRole(role.name(), RANGER_ADMIN_NAME, 
rangerServiceName);
+    } catch (RangerServiceException e) {
+      // Ignore exception to support idempotent operation
+    }
+    return Boolean.TRUE;
+  }
+
+  @Override
+  public Boolean onRoleUpdated(Role role, RoleChange... changes) throws 
RuntimeException {
+    for (RoleChange change : changes) {
+      boolean execResult;
+      if (change instanceof RoleChange.AddSecurableObject) {
+        execResult = doAddSecurableObject((RoleChange.AddSecurableObject) 
change);
+      } else if (change instanceof RoleChange.RemoveSecurableObject) {
+        execResult =
+            doRemoveSecurableObject(role.name(), 
(RoleChange.RemoveSecurableObject) change);
+      } else if (change instanceof RoleChange.UpdateSecurableObject) {
+        execResult =
+            doUpdateSecurableObject(role.name(), 
(RoleChange.UpdateSecurableObject) change);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported role change type: "
+                + (change == null ? "null" : 
change.getClass().getSimpleName()));
+      }
+      if (!execResult) {
+        return Boolean.FALSE;
+      }
+    }
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Set or transfer the ownership of the metadata object. <br>
+   *
+   * @param metadataObject The metadata object to set the owner.
+   * @param preOwner The previous owner of the metadata object. If the 
metadata object doesn't have
+   *     owner, then the preOwner will be null and newOwner will be not null.
+   * @param newOwner The new owner of the metadata object. If the metadata 
object already have
+   *     owner, then the preOwner and newOwner will not be null.
+   */
+  @Override
+  public Boolean onOwnerSet(MetadataObject metadataObject, Owner preOwner, 
Owner newOwner)
+      throws RuntimeException {
+    // 1. Set the owner of the metadata object
+    // 2. Transfer the ownership from preOwner to newOwner of the metadata 
object
+    check(newOwner != null, "The newOwner must be not null");
+
+    RangerPolicy policy = findManagedPolicy(metadataObject);
+    if (policy != null) {
+      policy.getPolicyItems().stream()
+          .filter(
+              policyItem -> {
+                return policyItem.getAccesses().stream()
+                    .allMatch(
+                        policyItemAccess -> {
+                          return 
ownerPrivileges.contains(policyItemAccess.getType());
+                        });
+              })
+          .forEach(
+              policyItem -> {
+                if (preOwner != null) {
+                  if (preOwner.type() == Owner.Type.USER) {
+                    policyItem.getUsers().removeIf(preOwner.name()::equals);
+                  } else {
+                    policyItem.getGroups().removeIf(preOwner.name()::equals);
+                  }
+                }
+                if (newOwner != null) {
+                  if (newOwner.type() == Owner.Type.USER) {
+                    policyItem.getUsers().add(newOwner.name());
+                  } else {
+                    policyItem.getGroups().add(newOwner.name());
+                  }
+                }
+              });
+    } else {
+      policy = new RangerPolicy();
+      policy.setService(rangerServiceName);
+      policy.setName(metadataObject.fullName());
+      policy.setPolicyLabels(Lists.newArrayList(MANAGED_BY_GRAVITINO));
+
+      List<String> nsMetadataObject =
+          
Lists.newArrayList(SecurableObjects.DOT_SPLITTER.splitToList(metadataObject.fullName()));
+      if (nsMetadataObject.size() > 4) {
+        // The max level of the securable object is `catalog.db.table.column`
+        throw new RuntimeException("The securable object than 4");
+      }
+      nsMetadataObject.remove(0); // remove `catalog`
+
+      for (int i = 0; i < nsMetadataObject.size(); i++) {
+        RangerPolicy.RangerPolicyResource policyResource =
+            new RangerPolicy.RangerPolicyResource(nsMetadataObject.get(i));
+        policy
+            .getResources()
+            .put(
+                i == 0
+                    ? RangerDefines.RESOURCE_DATABASE
+                    : i == 1 ? RangerDefines.RESOURCE_TABLE : 
RangerDefines.RESOURCE_COLUMN,
+                policyResource);
+      }
+
+      RangerPolicy finalPolicy = policy;
+      ownerPrivileges.stream()
+          .forEach(
+              ownerPrivilege -> {
+                RangerPolicy.RangerPolicyItem policyItem = new 
RangerPolicy.RangerPolicyItem();
+                policyItem
+                    .getAccesses()
+                    .add(new 
RangerPolicy.RangerPolicyItemAccess(ownerPrivilege));
+                if (newOwner != null) {
+                  if (newOwner.type() == Owner.Type.USER) {
+                    policyItem.getUsers().add(newOwner.name());
+                  } else {
+                    policyItem.getGroups().add(newOwner.name());
+                  }
+                }
+                finalPolicy.getPolicyItems().add(policyItem);
+              });
+    }
+    try {
+      if (policy.getId() == null) {
+        rangerClient.createPolicy(policy);
+      } else {
+        rangerClient.updatePolicy(policy.getId(), policy);
+      }
+    } catch (RangerServiceException e) {
+      throw new RuntimeException(e);
+    }
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to set the user.
+   */
+  @Override
+  public Boolean onGrantedRolesToUser(List<Role> roles, User user) throws 
RuntimeException {
+    // If the user is not exist, then create it.
+    onUserAdded(user);
+
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    roles.stream()
+        .forEach(
+            role -> {
+              createRangerRoleIfNotExists(role.name());
+              GrantRevokeRoleRequest grantRevokeRoleRequest =
+                  createGrantRevokeRoleRequest(role.name(), user.name(), null);
+              try {
+                rangerClient.grantRole(rangerServiceName, 
grantRevokeRoleRequest);
+              } catch (RangerServiceException e) {
+                // ignore exception, support idempotent operation
+              }
+
+              role.securableObjects().stream()
+                  .forEach(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        if (policy == null) {
+                          LOG.warn(
+                              "The policy does not exist for the securable 
object({})",
+                              securableObject);
+                          execResult.set(Boolean.FALSE);
+                          return;
+                        }
+
+                        securableObject.privileges().stream()
+                            .forEach(
+                                privilege -> {
+                                  mapPrivileges
+                                      .getOrDefault(privilege.name(), 
Collections.emptySet())
+                                      .stream()
+                                      .forEach(
+                                          rangerPrivilegeName -> {
+                                            policy.getPolicyItems().stream()
+                                                .forEach(
+                                                    policyItem -> {
+                                                      if 
(policyItem.getAccesses().stream()
+                                                          .anyMatch(
+                                                              policyItemAccess 
->
+                                                                  
policyItemAccess
+                                                                      
.getType()
+                                                                      .equals(
+                                                                          
rangerPrivilegeName))) {
+                                                        if (!policyItem
+                                                            .getRoles()
+                                                            
.contains(role.name())) {
+                                                          
policyItem.getRoles().add(role.name());
+                                                        }
+                                                      }
+                                                    });
+                                            try {
+                                              
rangerClient.updatePolicy(policy.getId(), policy);
+                                            } catch (RangerServiceException e) 
{
+                                              throw new RuntimeException(e);
+                                            }
+                                          });
+                                });
+                      });
+            });
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Because one Ranger policy maybe contain multiple Gravitino securable 
objects, <br>
+   * So we need to find the corresponding policy item mapping to remove the 
user.
+   */
+  @Override
+  public Boolean onRevokedRolesFromUser(List<Role> roles, User user) throws 
RuntimeException {
+    roles.stream()
+        .forEach(
+            role -> {
+              checkRangerRole(role.name());
+              GrantRevokeRoleRequest grantRevokeRoleRequest =
+                  createGrantRevokeRoleRequest(role.name(), user.name(), null);
+              try {
+                rangerClient.revokeRole(rangerServiceName, 
grantRevokeRoleRequest);
+              } catch (RangerServiceException e) {
+                // Ignore exception to support idempotent operation
+              }
+            });
+
+    return Boolean.TRUE;
+  }
+
+  /**
+   * Grant the roles to the group. <br>
+   * 1. Create a group in the Ranger if the group is not exist. <br>
+   * 2. Create a role in the Ranger if the role is not exist. <br>
+   * 3. Add this group to the role. <br>
+   * 4. Add the role to the policy item base the metadata object. <br>
+   *
+   * @param roles The roles to grant to the group.
+   * @param group The group to grant the roles.
+   */
+  @Override
+  public Boolean onGrantedRolesToGroup(List<Role> roles, Group group) throws 
RuntimeException {
+    // If the group is not exist, then create it.
+    onGroupAdded(group);
+
+    AtomicReference<Boolean> execResult = new AtomicReference<>(Boolean.TRUE);
+    roles.stream()
+        .forEach(
+            role -> {
+              createRangerRoleIfNotExists(role.name());
+              GrantRevokeRoleRequest grantRevokeRoleRequest =
+                  createGrantRevokeRoleRequest(role.name(), null, 
group.name());
+              try {
+                rangerClient.grantRole(rangerServiceName, 
grantRevokeRoleRequest);
+              } catch (RangerServiceException e) {
+                // Ignore exception to support idempotent operation
+              }
+
+              role.securableObjects().stream()
+                  .forEach(
+                      securableObject -> {
+                        RangerPolicy policy = 
findManagedPolicy(securableObject);
+                        if (policy == null) {
+                          LOG.warn(
+                              "The policy is not exist for the securable 
object({})",
+                              securableObject);
+                          execResult.set(Boolean.FALSE);
+                          return;
+                        }
+
+                        securableObject.privileges().stream()
+                            .forEach(
+                                privilege -> {
+                                  mapPrivileges
+                                      .getOrDefault(privilege.name(), 
Collections.emptySet())
+                                      .stream()
+                                      .forEach(
+                                          rangerPrivilege -> {
+                                            policy.getPolicyItems().stream()
+                                                .forEach(
+                                                    policyItem -> {
+                                                      if 
(policyItem.getAccesses().stream()
+                                                          .anyMatch(
+                                                              policyItemAccess 
->
+                                                                  
policyItemAccess
+                                                                      
.getType()
+                                                                      
.equals(rangerPrivilege))) {
+                                                        if (!policyItem
+                                                            .getRoles()
+                                                            
.contains(role.name())) {
+                                                          
policyItem.getRoles().add(role.name());
+                                                        }
+                                                      }
+                                                    });
+                                            try {
+                                              
rangerClient.updatePolicy(policy.getId(), policy);
+                                            } catch (RangerServiceException e) 
{
+                                              throw new RuntimeException(e);
+                                            }
+                                          });
+                                });
+                      });
+            });
+    if (!execResult.get()) {
+      return Boolean.FALSE;
+    }
+
+    return Boolean.TRUE;
+  }
+
+  @Override
+  public Boolean onRevokedRolesFromGroup(List<Role> roles, Group group) throws 
RuntimeException {
+    roles.stream()
+        .forEach(
+            role -> {
+              checkRangerRole(role.name());
+              GrantRevokeRoleRequest grantRevokeRoleRequest =
+                  createGrantRevokeRoleRequest(role.name(), null, 
group.name());
+              try {
+                rangerClient.revokeRole(rangerServiceName, 
grantRevokeRoleRequest);
+              } catch (RangerServiceException e) {
+                // Ignore exception to support idempotent operation
+              }
+            });
+
+    return Boolean.TRUE;
+  }
+
+  @Override
+  public Boolean onUserAdded(User user) throws RuntimeException {
+    VXUserList list = rangerClient.searchUser(ImmutableMap.of("name", 
user.name()));
+    if (list.getListSize() > 0) {
+      LOG.warn("The user({}) is already exist in the Ranger!", user.name());
+      return Boolean.FALSE;
+    }
+
+    VXUser rangerUser = 
VXUser.builder().withName(user.name()).withDescription(user.name()).build();
+    return rangerClient.createUser(rangerUser);
+  }
+
+  @Override
+  public Boolean onUserRemoved(User user) throws RuntimeException {
+    VXUserList list = rangerClient.searchUser(ImmutableMap.of("name", 
user.name()));
+    if (list.getListSize() == 0) {
+      LOG.warn("The user({}) is not exist in the Ranger!", user);
+      return Boolean.FALSE;
+    }
+    rangerClient.deleteUser(list.getList().get(0).getId());
+    return Boolean.TRUE;
+  }
+
+  @Override
+  public Boolean onUserAcquired(User user) throws RuntimeException {
+    VXUserList list = rangerClient.searchUser(ImmutableMap.of("name", 
user.name()));
+    if (list.getListSize() == 0) {
+      LOG.warn("The user({}) is not exist in the Ranger!", user);
+      return Boolean.FALSE;
+    }
+    return Boolean.TRUE;
+  }
+
+  @Override
+  public Boolean onGroupAdded(Group group) throws RuntimeException {
+    return 
rangerClient.createGroup(VXGroup.builder().withName(group.name()).build());
+  }
+
+  @Override
+  public Boolean onGroupRemoved(Group group) throws RuntimeException {
+    VXGroupList list = rangerClient.searchGroup(ImmutableMap.of("name", 
group.name()));
+    if (list.getListSize() == 0) {
+      LOG.warn("The group({}) is not exist in the Ranger!", group);
+      return Boolean.FALSE;
+    }
+    return rangerClient.deleteGroup(list.getList().get(0).getId());
+  }
+
+  @Override
+  public Boolean onGroupAcquired(Group group) {
+    VXGroupList vxGroupList = rangerClient.searchGroup(ImmutableMap.of("name", 
group.name()));
+    if (vxGroupList.getListSize() == 0) {
+      LOG.warn("The group({}) is not exist in the Ranger!", group);

Review Comment:
   is not exists



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to