shizhengchao created FLINK-25703:
------------------------------------

             Summary: In Batch, I think .stagingPath should be created when the 
task is running to prevent permission issues caused by inconsistency of hadoop 
usernames
                 Key: FLINK-25703
                 URL: https://issues.apache.org/jira/browse/FLINK-25703
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.14.3
            Reporter: shizhengchao


If the HADOOP_USER_NAME set on the client side is user_a, but when the task is 
running in the TaskManager is user_b(this is achievable), then the Batch task 
will cause permission problems, as follows:

 
{code:java}
//代码占位符
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=user_b, access=WRITE, 
inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
because the ./staging__1642575264185 is created by user_a。

 

So we should move the code that creates the .staging directory into the 
FileSystemOutputFormat#open method :
{code:java}
//代码占位符
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    try {
        // check temppath and create it
        checkOrCreateTmpPath();
        PartitionTempFileManager fileManager =
                new PartitionTempFileManager(
                        fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, 
outputFileConfig);
        PartitionWriter.Context<T> context =
                new PartitionWriter.Context<>(parameters, formatFactory);
        writer =
                PartitionWriterFactory.<T>get(
                                partitionColumns.length - 
staticPartitions.size() > 0,
                                dynamicGrouped,
                                staticPartitions)
                        .create(context, fileManager, computer);
    } catch (Exception e) {
        throw new TableException("Exception in open", e);
    }
}

private void checkOrCreateTmpPath() throws Exception {
    FileSystem fs = tmpPath.getFileSystem();
    Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
        "Failed to create staging dir " + tmpPath);
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to