This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 734520a77b1fc48adf24a2dba8533a85b143c885
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Fri Apr 19 11:25:12 2024 +0800

    [bugfix](hive)delete write path after hive insert (#33798)
    
    Issue #31442
    
    1. delete file according query id
    2. delete write path after insert
---
 .../doris/datasource/hive/HMSTransaction.java      |  59 ++++---
 .../commands/insert/HiveInsertCommandContext.java  |  18 +++
 .../plans/commands/insert/HiveInsertExecutor.java  |   9 ++
 .../commands/insert/InsertIntoTableCommand.java    |   3 +-
 .../org/apache/doris/planner/HiveTableSink.java    |   1 +
 .../doris/datasource/hive/HmsCommitTest.java       | 173 +++++++++++++++++----
 6 files changed, 215 insertions(+), 48 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 7a1d4389096..32dd083c2ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.fs.FileSystem;
 import org.apache.doris.fs.FileSystemUtil;
 import org.apache.doris.fs.remote.RemoteFile;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUpdateMode;
@@ -72,6 +73,7 @@ public class HMSTransaction implements Transaction {
     private String dbName;
     private String tbName;
     private Optional<SummaryProfile> summaryProfile = Optional.empty();
+    private String queryId;
 
     private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = 
new HashMap<>();
     private final Map<DatabaseTableName, Map<List<String>, 
Action<PartitionAndMore>>>
@@ -79,6 +81,7 @@ public class HMSTransaction implements Transaction {
 
     private HmsCommitter hmsCommitter;
     private List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
+    private String declaredIntentionsToWrite;
 
     public HMSTransaction(HiveMetadataOps hiveOps) {
         this.hiveOps = hiveOps;
@@ -124,6 +127,11 @@ public class HMSTransaction implements Transaction {
         }
     }
 
+    public void beginInsertTable(HiveInsertCommandContext ctx) {
+        declaredIntentionsToWrite = ctx.getWritePath();
+        queryId = ctx.getQueryId();
+    }
+
     public void finishInsertTable(String dbName, String tbName) {
         this.dbName = dbName;
         this.tbName = tbName;
@@ -239,8 +247,12 @@ public class HMSTransaction implements Transaction {
             hmsCommitter.doCommit();
         } catch (Throwable t) {
             LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
-            hmsCommitter.abort();
-            hmsCommitter.rollback();
+            try {
+                hmsCommitter.abort();
+                hmsCommitter.rollback();
+            } catch (RuntimeException e) {
+                t.addSuppressed(new Exception("Failed to roll back after 
commit failure", e));
+            }
             throw t;
         } finally {
             hmsCommitter.runClearPathsForFinish();
@@ -560,10 +572,10 @@ public class HMSTransaction implements Transaction {
             return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
         }
 
-        return doRecursiveDeleteFiles(directory, deleteEmptyDir);
+        return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId);
     }
 
-    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir, String queryId) {
         List<RemoteFile> allFiles = new ArrayList<>();
         Set<String> allDirs = new HashSet<>();
         Status statusFile = fs.listFiles(directory.toString(), true, allFiles);
@@ -577,15 +589,18 @@ public class HMSTransaction implements Transaction {
         boolean allDescendentsDeleted = true;
         ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
         for (RemoteFile file : allFiles) {
-            String fileName = file.getName();
-            if (!deleteIfExists(new Path(fileName))) {
+            if (file.getName().startsWith(queryId)) {
+                if (!deleteIfExists(file.getPath())) {
+                    allDescendentsDeleted = false;
+                    notDeletedEligibleItems.add(file.getPath().toString());
+                }
+            } else {
                 allDescendentsDeleted = false;
-                notDeletedEligibleItems.add(fileName);
             }
         }
 
         for (String dir : allDirs) {
-            DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new 
Path(dir), deleteEmptyDir);
+            DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new 
Path(dir), deleteEmptyDir, queryId);
             if (!subResult.dirNotExists()) {
                 allDescendentsDeleted = false;
             }
@@ -1038,6 +1053,9 @@ public class HMSTransaction implements Transaction {
         }
 
         private void undoAddPartitionsTask() {
+            if (addPartitionsTask.isEmpty()) {
+                return;
+            }
 
             HivePartition firstPartition = 
addPartitionsTask.getPartitions().get(0).getPartition();
             String dbName = firstPartition.getDbName();
@@ -1319,6 +1337,10 @@ public class HMSTransaction implements Transaction {
             
summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime);
         }
 
+        public void pruneAndDeleteStagingDirectories() {
+            recursiveDeleteItems(new Path(declaredIntentionsToWrite), true);
+        }
+
         public void doNothing() {
             // do nothing
             // only for regression test and unit test to throw exception
@@ -1342,6 +1364,7 @@ public class HMSTransaction implements Transaction {
 
         public void rollback() {
             //delete write path
+            pruneAndDeleteStagingDirectories();
         }
     }
 
@@ -1372,22 +1395,22 @@ public class HMSTransaction implements Transaction {
     }
 
     public void wrapperAsyncRenameWithProfileSummary(Executor executor,
-                                              List<CompletableFuture<?>> 
renameFileFutures,
-                                              AtomicBoolean cancelled,
-                                              String origFilePath,
-                                              String destFilePath,
-                                              List<String> fileNames) {
+                                                     
List<CompletableFuture<?>> renameFileFutures,
+                                                     AtomicBoolean cancelled,
+                                                     String origFilePath,
+                                                     String destFilePath,
+                                                     List<String> fileNames) {
         FileSystemUtil.asyncRenameFiles(
                 fs, executor, renameFileFutures, cancelled, origFilePath, 
destFilePath, fileNames);
         summaryProfile.ifPresent(profile -> 
profile.addRenameFileCnt(fileNames.size()));
     }
 
     public void wrapperAsyncRenameDirWithProfileSummary(Executor executor,
-                                                 List<CompletableFuture<?>> 
renameFileFutures,
-                                                 AtomicBoolean cancelled,
-                                                 String origFilePath,
-                                                 String destFilePath,
-                                                 Runnable runWhenPathNotExist) 
{
+                                                        
List<CompletableFuture<?>> renameFileFutures,
+                                                        AtomicBoolean 
cancelled,
+                                                        String origFilePath,
+                                                        String destFilePath,
+                                                        Runnable 
runWhenPathNotExist) {
         FileSystemUtil.asyncRenameDir(
                 fs, executor, renameFileFutures, cancelled, origFilePath, 
destFilePath, runWhenPathNotExist);
         summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
index 31e56fd6ccc..49d5a12c2bb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
@@ -22,6 +22,8 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
  */
 public class HiveInsertCommandContext extends InsertCommandContext {
     private boolean overwrite = false;
+    private String writePath;
+    private String queryId;
 
     public boolean isOverwrite() {
         return overwrite;
@@ -30,4 +32,20 @@ public class HiveInsertCommandContext extends 
InsertCommandContext {
     public void setOverwrite(boolean overwrite) {
         this.overwrite = overwrite;
     }
+
+    public String getWritePath() {
+        return writePath;
+    }
+
+    public void setWritePath(String writePath) {
+        this.writePath = writePath;
+    }
+
+    public String getQueryId() {
+        return queryId;
+    }
+
+    public void setQueryId(String queryId) {
+        this.queryId = queryId;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 16236c340a6..9421af79503 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -35,10 +35,12 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionManager;
 import org.apache.doris.transaction.TransactionStatus;
 import org.apache.doris.transaction.TransactionType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -97,6 +99,13 @@ public class HiveInsertExecutor extends 
AbstractInsertExecutor {
     @Override
     protected void beforeExec() {
         // check params
+        HMSTransaction transaction = (HMSTransaction) 
transactionManager.getTransaction(txnId);
+        Preconditions.checkArgument(insertCtx.isPresent(), "insert context 
must be present");
+        HiveInsertCommandContext ctx = (HiveInsertCommandContext) 
insertCtx.get();
+        TUniqueId tUniqueId = ConnectContext.get().queryId();
+        Preconditions.checkArgument(tUniqueId != null, "query id shouldn't be 
null");
+        ctx.setQueryId(DebugUtil.printId(tUniqueId));
+        transaction.beginInsertTable(ctx);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 61024345c06..7f326cf2212 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -173,7 +173,8 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                         
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
             } else if (physicalSink instanceof PhysicalHiveTableSink) {
                 HMSExternalTable hiveExternalTable = (HMSExternalTable) 
targetTableIf;
-                insertExecutor = new HiveInsertExecutor(ctx, 
hiveExternalTable, label, planner, insertCtx);
+                insertExecutor = new HiveInsertExecutor(ctx, 
hiveExternalTable, label, planner,
+                        Optional.of(insertCtx.orElse((new 
HiveInsertCommandContext()))));
                 // set hive query options
             } else {
                 // TODO: support other table types
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index a4fa1d11cb0..c45bdc1ebc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -141,6 +141,7 @@ public class HiveTableSink extends DataSink {
         if (insertCtx.isPresent()) {
             HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
             tSink.setOverwrite(context.isOverwrite());
+            context.setWritePath(writeTempPath);
         }
         tDataSink = new TDataSink(getDataSinkType());
         tDataSink.setHiveTableSink(tSink);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index dedc7738c86..ba87dd8f48e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -19,11 +19,14 @@ package org.apache.doris.datasource.hive;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.TestHMSCachedClient;
 import org.apache.doris.fs.LocalDfsFileSystem;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.THiveLocationParams;
 import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateMode;
 
 import com.google.common.collect.Lists;
@@ -47,7 +50,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class HmsCommitTest {
 
@@ -61,6 +68,7 @@ public class HmsCommitTest {
     static String writeLocation;
     static String uri = "thrift://127.0.0.1:9083";
     static boolean hasRealHmsService = false;
+    private ConnectContext connectContext;
 
     @BeforeClass
     public static void beforeClass() throws Throwable {
@@ -70,10 +78,6 @@ public class HmsCommitTest {
         writeLocation = "file://" + writePath.toAbsolutePath() + "/";
         createTestHiveCatalog();
         createTestHiveDatabase();
-
-        // context
-        ConnectContext connectContext = new ConnectContext();
-        connectContext.setThreadLocalInfo();
     }
 
     @AfterClass
@@ -114,17 +118,21 @@ public class HmsCommitTest {
         partitionKeys.add("c3");
         String fileFormat = "orc";
         HiveTableMetadata tableMetadata = new HiveTableMetadata(
-                dbName, tbWithPartition, Optional.of(dbLocation + 
tbWithPartition),
+                dbName, tbWithPartition, Optional.of(dbLocation + 
tbWithPartition + UUID.randomUUID()),
                 columns, partitionKeys,
                 new HashMap<>(), fileFormat, "");
         hmsClient.createTable(tableMetadata, true);
 
         // create table for tbWithoutPartition
         HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
-                    dbName, tbWithoutPartition, Optional.of(dbLocation + 
tbWithPartition),
+                    dbName, tbWithoutPartition, Optional.of(dbLocation + 
tbWithPartition + UUID.randomUUID()),
                     columns, new ArrayList<>(),
                     new HashMap<>(), fileFormat, "");
         hmsClient.createTable(tableMetadata2, true);
+
+        // context
+        connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
     }
 
     @After
@@ -142,6 +150,8 @@ public class HmsCommitTest {
 
     @Test
     public void testAppendPartitionForUnPartitionedTable() throws IOException {
+        genQueryID();
+        System.out.println(DebugUtil.printId(connectContext.queryId()));
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomAppend(null));
         pus.add(createRandomAppend(null));
@@ -150,6 +160,8 @@ public class HmsCommitTest {
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(3, table);
 
+        genQueryID();
+        System.out.println(DebugUtil.printId(connectContext.queryId()));
         List<THivePartitionUpdate> pus2 = new ArrayList<>();
         pus2.add(createRandomAppend(null));
         pus2.add(createRandomAppend(null));
@@ -162,6 +174,8 @@ public class HmsCommitTest {
     @Test
     public void testOverwritePartitionForUnPartitionedTable() throws 
IOException {
         testAppendPartitionForUnPartitionedTable();
+
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomOverwrite(null));
         pus.add(createRandomOverwrite(null));
@@ -173,6 +187,7 @@ public class HmsCommitTest {
 
     @Test
     public void testNewPartitionForPartitionedTable() throws IOException {
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomNew("a"));
         pus.add(createRandomNew("a"));
@@ -194,6 +209,7 @@ public class HmsCommitTest {
     public void testAppendPartitionForPartitionedTable() throws IOException {
         testNewPartitionForPartitionedTable();
 
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomAppend("a"));
         pus.add(createRandomAppend("a"));
@@ -214,6 +230,8 @@ public class HmsCommitTest {
     @Test
     public void testOverwritePartitionForPartitionedTable() throws IOException 
{
         testAppendPartitionForPartitionedTable();
+
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomOverwrite("a"));
         pus.add(createRandomOverwrite("b"));
@@ -230,6 +248,7 @@ public class HmsCommitTest {
 
     @Test
     public void testNewManyPartitionForPartitionedTable() throws IOException {
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         int nums = 150;
         for (int i = 0; i < nums; i++) {
@@ -242,6 +261,7 @@ public class HmsCommitTest {
             assertNumRows(1, p);
         }
 
+        genQueryID();
         try {
             commit(dbName, tbWithPartition, 
Collections.singletonList(createRandomNew("1")));
         } catch (Exception e) {
@@ -254,6 +274,7 @@ public class HmsCommitTest {
         // first add three partition: a,b,c
         testNewPartitionForPartitionedTable();
 
+        genQueryID();
         // second append two partition: a,x
         // but there is no 'x' partition in the previous table, so when 
verifying based on HMS,
         // it will throw exception
@@ -281,12 +302,17 @@ public class HmsCommitTest {
 
     public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, 
TUpdateMode mode) throws IOException {
 
-        String uuid = UUID.randomUUID().toString();
+        String queryId = "";
+        if (connectContext.queryId() != null) {
+            queryId = DebugUtil.printId(connectContext.queryId());
+        }
+
         THiveLocationParams location = new THiveLocationParams();
-        String targetPath = dbLocation + uuid + "/" + partitionValue;
+        String targetPath = dbLocation + queryId + "/" + partitionValue;
 
         location.setTargetPath(targetPath);
-        location.setWritePath(writeLocation + partitionValue);
+        String writePath = writeLocation + queryId + "/" + partitionValue;
+        location.setWritePath(writePath);
 
         THivePartitionUpdate pu = new THivePartitionUpdate();
         if (partitionValue != null) {
@@ -296,9 +322,10 @@ public class HmsCommitTest {
         pu.setRowCount(1);
         pu.setFileSize(1);
         pu.setLocation(location);
-        String f1 = uuid + "f1";
-        String f2 = uuid + "f2";
-        String f3 = uuid + "f3";
+        String uuid = UUID.randomUUID().toString();
+        String f1 = queryId + "_" + uuid + "_f1.orc";
+        String f2 = queryId + "_" + uuid + "_f2.orc";
+        String f3 = queryId + "_" + uuid + "_f3.orc";
 
         pu.setFileNames(new ArrayList<String>() {
             {
@@ -312,9 +339,9 @@ public class HmsCommitTest {
             fs.makeDir(targetPath);
         }
 
-        fs.createFile(writeLocation + partitionValue + "/" + f1);
-        fs.createFile(writeLocation + partitionValue + "/" + f2);
-        fs.createFile(writeLocation + partitionValue + "/" + f3);
+        fs.createFile(writePath + "/" + f1);
+        fs.createFile(writePath + "/" + f2);
+        fs.createFile(writePath + "/" + f3);
         return pu;
     }
 
@@ -338,6 +365,11 @@ public class HmsCommitTest {
                        List<THivePartitionUpdate> hivePUs) {
         HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
         hmsTransaction.setHivePartitionUpdates(hivePUs);
+        HiveInsertCommandContext ctx = new HiveInsertCommandContext();
+        String queryId = DebugUtil.printId(ConnectContext.get().queryId());
+        ctx.setQueryId(queryId);
+        ctx.setWritePath(writeLocation + queryId + "/");
+        hmsTransaction.beginInsertTable(ctx);
         hmsTransaction.finishInsertTable(dbName, tableName);
         hmsTransaction.commit();
     }
@@ -352,6 +384,21 @@ public class HmsCommitTest {
         };
     }
 
+    public void mockAsyncRenameDir(Runnable runnable) {
+        new MockUp<HMSTransaction>(HMSTransaction.class) {
+            @Mock
+            private void wrapperAsyncRenameDirWithProfileSummary(Executor 
executor,
+                                                                 
List<CompletableFuture<?>> renameFileFutures,
+                                                                 AtomicBoolean 
cancelled,
+                                                                 String 
origFilePath,
+                                                                 String 
destFilePath,
+                                                                 Runnable 
runWhenPathNotExist) {
+                runnable.run();
+                throw new RuntimeException("failed to rename dir");
+            }
+        };
+    }
+
     public void mockDoOther(Runnable runnable) {
         new 
MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) {
             @Mock
@@ -372,8 +419,49 @@ public class HmsCommitTest {
         };
     }
 
+    public void genQueryID() {
+        connectContext.setQueryId(new TUniqueId(new Random().nextInt(), new 
Random().nextInt()));
+    }
+
+    @Test
+    public void testRollbackWritePath() throws IOException {
+        genQueryID();
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("a"));
+
+        THiveLocationParams location = pus.get(0).getLocation();
+
+        // For new partition, there should be no target path
+        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+
+        mockAsyncRenameDir(() -> {
+            // commit will be failed, and it will remain some files in write 
path
+            String writePath = location.getWritePath();
+            Assert.assertTrue(fs.exists(writePath).ok());
+            for (String file : pus.get(0).getFileNames()) {
+                Assert.assertTrue(fs.exists(writePath + "/" + file).ok());
+            }
+        });
+
+        try {
+            commit(dbName, tbWithPartition, pus);
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // After rollback, these files in write path will be deleted
+        String writePath = location.getWritePath();
+        Assert.assertFalse(fs.exists(writePath).ok());
+        for (String file : pus.get(0).getFileNames()) {
+            Assert.assertFalse(fs.exists(writePath + "/" + file).ok());
+        }
+    }
+
     @Test
     public void testRollbackNewPartitionForPartitionedTableForFilesystem() 
throws IOException {
+        genQueryID();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomNew("a"));
 
@@ -413,6 +501,8 @@ public class HmsCommitTest {
         // first create three partitions: a,b,c
         testNewPartitionForPartitionedTable();
 
+        genQueryID();
+
         // second add 'new partition' for 'x'
         //        add 'append partition' for 'a'
         // when 'doCommit', 'new partition' will be executed before 'append 
partition'
@@ -421,15 +511,16 @@ public class HmsCommitTest {
         pus.add(createRandomNew("x"));
         pus.add(createRandomAppend("a"));
 
-        THiveLocationParams location = pus.get(0).getLocation();
+        THiveLocationParams locationForX = pus.get(0).getLocation();
+        THiveLocationParams locationForA = pus.get(0).getLocation();
 
         // For new partition, there should be no target path
-        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
-        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+        Assert.assertFalse(fs.exists(locationForX.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(locationForX.getWritePath()).ok());
 
         mockUpdateStatisticsTaskException(() -> {
             // When the commit is completed, these files should be renamed 
successfully
-            String targetPath = location.getTargetPath();
+            String targetPath = locationForX.getTargetPath();
             Assert.assertTrue(fs.exists(targetPath).ok());
             for (String file : pus.get(0).getFileNames()) {
                 Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
@@ -448,11 +539,13 @@ public class HmsCommitTest {
         }
 
         // After rollback, these files will be deleted
-        String targetPath = location.getTargetPath();
+        String targetPath = locationForX.getTargetPath();
         Assert.assertFalse(fs.exists(targetPath).ok());
         for (String file : pus.get(0).getFileNames()) {
             Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
         }
+        Assert.assertFalse(fs.exists(locationForX.getWritePath()).ok());
+        Assert.assertFalse(fs.exists(locationForA.getWritePath()).ok());
         // x partition will be deleted
         Assert.assertThrows(
                 "the 'x' partition should be deleted",
@@ -466,24 +559,39 @@ public class HmsCommitTest {
         // first create three partitions: a,b,c
         testNewPartitionForPartitionedTable();
 
+        genQueryID();
         // second add 'new partition' for 'x'
         //        add 'append partition' for 'a'
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomNew("x"));
         pus.add(createRandomAppend("a"));
 
-        THiveLocationParams location = pus.get(0).getLocation();
+        THiveLocationParams locationForParX = pus.get(0).getLocation();
+        // in test, targetPath is a random path
+        // but when appending a partition, it uses the location of the 
original partition as the targetPath
+        // so here we need to get the path of partition a
+        Partition a = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+        String location = a.getSd().getLocation();
+        pus.get(1).getLocation().setTargetPath(location);
+        THiveLocationParams locationForParA = pus.get(1).getLocation();
 
         // For new partition, there should be no target path
-        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
-        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+        Assert.assertFalse(fs.exists(locationForParX.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(locationForParX.getWritePath()).ok());
+
+        // For exist partition
+        Assert.assertTrue(fs.exists(locationForParA.getTargetPath()).ok());
 
         mockDoOther(() -> {
             // When the commit is completed, these files should be renamed 
successfully
-            String targetPath = location.getTargetPath();
-            Assert.assertTrue(fs.exists(targetPath).ok());
+            String targetPathForX = locationForParX.getTargetPath();
+            Assert.assertTrue(fs.exists(targetPathForX).ok());
             for (String file : pus.get(0).getFileNames()) {
-                Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
+                Assert.assertTrue(fs.exists(targetPathForX + "/" + file).ok());
+            }
+            String targetPathForA = locationForParA.getTargetPath();
+            for (String file : pus.get(1).getFileNames()) {
+                Assert.assertTrue(fs.exists(targetPathForA + "/" + file).ok());
             }
             // new partition will be executed,
             // so, we can get the new partition
@@ -503,11 +611,18 @@ public class HmsCommitTest {
         }
 
         // After rollback, these files will be deleted
-        String targetPath = location.getTargetPath();
-        Assert.assertFalse(fs.exists(targetPath).ok());
+        String targetPathForX = locationForParX.getTargetPath();
+        Assert.assertFalse(fs.exists(targetPathForX).ok());
         for (String file : pus.get(0).getFileNames()) {
-            Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
+            Assert.assertFalse(fs.exists(targetPathForX + "/" + file).ok());
+        }
+        Assert.assertFalse(fs.exists(locationForParX.getWritePath()).ok());
+        String targetPathForA = locationForParA.getTargetPath();
+        for (String file : pus.get(1).getFileNames()) {
+            Assert.assertFalse(fs.exists(targetPathForA + "/" + file).ok());
         }
+        Assert.assertTrue(fs.exists(targetPathForA).ok());
+        Assert.assertFalse(fs.exists(locationForParA.getWritePath()).ok());
         // x partition will be deleted
         Assert.assertThrows(
                 "the 'x' partition should be deleted",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to