This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b670373562 [INLONG-11195][Manager] It is not allowed to modify group 
information when ordinary users are not responsible (#11196)
b670373562 is described below

commit b670373562734e76e3c5cf7106a10a1c4f5b53d5
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Sep 25 14:13:46 2024 +0800

    [INLONG-11195][Manager] It is not allowed to modify group information when 
ordinary users are not responsible (#11196)
---
 .../service/group/InlongGroupServiceImpl.java      |  5 +++++
 .../service/sink/StreamSinkServiceImpl.java        | 15 ++++++++++++---
 .../service/source/StreamSourceServiceImpl.java    |  8 +++++++-
 .../service/stream/InlongStreamServiceImpl.java    | 22 ++++++++++++++++++++++
 .../inlong/manager/service/user/UserService.java   |  9 +++++++++
 .../manager/service/user/UserServiceImpl.java      | 13 +++++++++++++
 6 files changed, 68 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 3ee94a23ce..66abc58937 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -85,6 +85,7 @@ import 
org.apache.inlong.manager.service.source.bounded.BoundedSourceType;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.tenant.InlongTenantService;
 import org.apache.inlong.manager.service.user.InlongRoleService;
+import org.apache.inlong.manager.service.user.UserService;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -166,6 +167,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     private InlongRoleService inlongRoleService;
     @Autowired
     private TenantUserRoleEntityMapper tenantUserRoleEntityMapper;
+    @Autowired
+    private UserService userService;
 
     @Autowired
     ScheduleOperator scheduleOperator;
@@ -501,6 +504,8 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
             LOGGER.error("inlong group not found by groupId={}", groupId);
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
+        userService.checkUser(entity.getInCharges(), operator,
+                "Current user does not have permission to update group info");
         chkUnmodifiableParams(entity, request);
         // check whether the current status can be modified
         doUpdateCheck(entity, request, operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4ce1d1c76c..dab2cdc93a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -63,6 +63,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
+import org.apache.inlong.manager.service.user.UserService;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -138,6 +139,8 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     private AutowireCapableBeanFactory autowireCapableBeanFactory;
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private UserService userService;
     // To avoid circular dependencies, you cannot use @Autowired, it will be 
injected by AutowireCapableBeanFactory
     private InlongStreamProcessService streamProcessOperation;
 
@@ -449,7 +452,9 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
         }
         chkUnmodifiableParams(curEntity, request);
-        groupCheckService.checkGroupStatus(request.getInlongGroupId(), 
operator);
+        InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to update sink info");
         // Check whether the stream exist or not
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
                 request.getInlongGroupId(), request.getInlongStreamId());
@@ -526,7 +531,9 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.expectNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
 
-        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), 
operator);
+        InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to delete sink info");
 
         StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
@@ -553,7 +560,9 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         Preconditions.expectNotNull(entity, String.format("stream sink not 
exist by groupId=%s streamId=%s sinkName=%s",
                 groupId, streamId, sinkName));
 
-        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), 
operator);
+        InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to delete sink info");
 
         StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index c92a544054..0241524dcf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.service.group.GroupCheckService;
+import org.apache.inlong.manager.service.user.UserService;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
@@ -90,6 +91,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     private StreamSourceFieldEntityMapper sourceFieldMapper;
     @Autowired
     private GroupCheckService groupCheckService;
+    @Autowired
+    private UserService userService;
 
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
@@ -296,6 +299,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
                     String.format("InlongGroup does not exist with 
InlongGroupId=%s", groupId));
         }
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to update source info");
         StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
         // Remove id in sourceField when save
         List<StreamField> streamFields = request.getFieldList();
@@ -334,7 +339,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
                     String.format("InlongGroup does not exist with 
InlongGroupId=%s", entity.getInlongGroupId()));
         }
-
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to delete source info");
         SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
         SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
         // if source is frozen|failed|new, or if it is a template source or 
auto push source, delete directly
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 93692aeb28..101c39d24f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -67,6 +67,7 @@ import 
org.apache.inlong.manager.service.sink.SinkOperatorFactory;
 import org.apache.inlong.manager.service.sink.StreamSinkOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.user.UserService;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -150,6 +151,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
     @Autowired
     @Lazy
     private SinkOperatorFactory sinkOperatorFactory;
+    @Autowired
+    private UserService userService;
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
@@ -451,6 +454,13 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         Preconditions.expectNotNull(request, "inlong stream request is empty");
         String groupId = request.getInlongGroupId();
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
+        if (groupEntity == null) {
+            throw new BusinessException(String.format("InlongGroup does not 
exist with InlongGroupId=%s", groupId));
+        }
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to update stream info");
+
         String streamId = request.getInlongStreamId();
         Preconditions.expectNotBlank(streamId, 
ErrorCodeEnum.STREAM_ID_IS_EMPTY);
 
@@ -514,6 +524,12 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
 
         // Check if it can be deleted
         this.checkGroupStatusIsTemp(groupId);
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
+        if (groupEntity == null) {
+            throw new BusinessException(String.format("InlongGroup does not 
exist with InlongGroupId=%s", groupId));
+        }
+        userService.checkUser(groupEntity.getInCharges(), operator,
+                "Current user does not have permission to delete stream info");
 
         InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, 
streamId);
         if (entity == null) {
@@ -951,6 +967,12 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
     @Override
     public List<BriefMQMessage> listMessages(QueryMessageRequest request, 
String operator) {
         InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getGroupId());
+        if (groupEntity == null) {
+            throw new BusinessException(
+                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getGroupId()));
+        }
+        userService.checkUser(groupEntity.getInCharges(), operator, String
+                .format("Current user does not have permission to query 
message for groupId=%s", request.getGroupId()));
         InlongGroupOperator instance = 
groupOperatorFactory.getInstance(groupEntity.getMqType());
         InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
         InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), 
request.getStreamId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserService.java
index 9e2de4bfea..be79bb6980 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserService.java
@@ -82,4 +82,13 @@ public interface UserService {
      */
     void login(UserLoginRequest req);
 
+    /**
+     * Check the given user is the admin or is one of the in charges.
+     *
+     * @param inCharges incharge list
+     * @param user current user name
+     * @param errMsg error message
+     */
+    void checkUser(String inCharges, String user, String errMsg);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
index a8ac5f4bd7..36a75c4cc3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java
@@ -50,11 +50,13 @@ import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.pojo.user.UserLoginLockStatus;
 import org.apache.inlong.manager.pojo.user.UserLoginRequest;
 import org.apache.inlong.manager.pojo.user.UserRequest;
+import org.apache.inlong.manager.pojo.user.UserRoleCode;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shiro.SecurityUtils;
@@ -351,6 +353,17 @@ public class UserServiceImpl implements UserService {
         loginLockStatusMap.put(username, userLoginLockStatus);
     }
 
+    @Override
+    public void checkUser(String inCharges, String user, String errMsg) {
+        Set<String> userRoles = LoginUserUtils.getLoginUser().getRoles();
+        boolean isAdmin = false;
+        if (CollectionUtils.isNotEmpty(userRoles)) {
+            isAdmin = userRoles.contains(UserRoleCode.INLONG_ADMIN) || 
userRoles.contains(UserRoleCode.TENANT_ADMIN);
+        }
+        boolean isInCharge = Preconditions.inSeparatedString(user, inCharges, 
InlongConstants.COMMA);
+        Preconditions.expectTrue(isInCharge || isAdmin, errMsg);
+    }
+
     public void removeInChargeForGroup(String user, String operator) {
         InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
         pageRequest.setCurrentUser(user);

Reply via email to