This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 734520a77b1fc48adf24a2dba8533a85b143c885 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Fri Apr 19 11:25:12 2024 +0800 [bugfix](hive)delete write path after hive insert (#33798) Issue #31442 1. delete file according query id 2. delete write path after insert --- .../doris/datasource/hive/HMSTransaction.java | 59 ++++--- .../commands/insert/HiveInsertCommandContext.java | 18 +++ .../plans/commands/insert/HiveInsertExecutor.java | 9 ++ .../commands/insert/InsertIntoTableCommand.java | 3 +- .../org/apache/doris/planner/HiveTableSink.java | 1 + .../doris/datasource/hive/HmsCommitTest.java | 173 +++++++++++++++++---- 6 files changed, 215 insertions(+), 48 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 7a1d4389096..32dd083c2ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -27,6 +27,7 @@ import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.fs.FileSystem; import org.apache.doris.fs.FileSystemUtil; import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TUpdateMode; @@ -72,6 +73,7 @@ public class HMSTransaction implements Transaction { private String dbName; private String tbName; private Optional<SummaryProfile> summaryProfile = Optional.empty(); + private String queryId; private final Map<DatabaseTableName, Action<TableAndMore>> tableActions = new HashMap<>(); private final Map<DatabaseTableName, Map<List<String>, Action<PartitionAndMore>>> @@ -79,6 +81,7 @@ public class HMSTransaction implements Transaction { private HmsCommitter hmsCommitter; private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList(); + private String declaredIntentionsToWrite; public HMSTransaction(HiveMetadataOps hiveOps) { this.hiveOps = hiveOps; @@ -124,6 +127,11 @@ public class HMSTransaction implements Transaction { } } + public void beginInsertTable(HiveInsertCommandContext ctx) { + declaredIntentionsToWrite = ctx.getWritePath(); + queryId = ctx.getQueryId(); + } + public void finishInsertTable(String dbName, String tbName) { this.dbName = dbName; this.tbName = tbName; @@ -239,8 +247,12 @@ public class HMSTransaction implements Transaction { hmsCommitter.doCommit(); } catch (Throwable t) { LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName); - hmsCommitter.abort(); - hmsCommitter.rollback(); + try { + hmsCommitter.abort(); + hmsCommitter.rollback(); + } catch (RuntimeException e) { + t.addSuppressed(new Exception("Failed to roll back after commit failure", e)); + } throw t; } finally { hmsCommitter.runClearPathsForFinish(); @@ -560,10 +572,10 @@ public class HMSTransaction implements Transaction { return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); } - return doRecursiveDeleteFiles(directory, deleteEmptyDir); + return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId); } - private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { + private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir, String queryId) { List<RemoteFile> allFiles = new ArrayList<>(); Set<String> allDirs = new HashSet<>(); Status statusFile = fs.listFiles(directory.toString(), true, allFiles); @@ -577,15 +589,18 @@ public class HMSTransaction implements Transaction { boolean allDescendentsDeleted = true; ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); for (RemoteFile file : allFiles) { - String fileName = file.getName(); - if (!deleteIfExists(new Path(fileName))) { + if (file.getName().startsWith(queryId)) { + if (!deleteIfExists(file.getPath())) { + allDescendentsDeleted = false; + notDeletedEligibleItems.add(file.getPath().toString()); + } + } else { allDescendentsDeleted = false; - notDeletedEligibleItems.add(fileName); } } for (String dir : allDirs) { - DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new Path(dir), deleteEmptyDir); + DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new Path(dir), deleteEmptyDir, queryId); if (!subResult.dirNotExists()) { allDescendentsDeleted = false; } @@ -1038,6 +1053,9 @@ public class HMSTransaction implements Transaction { } private void undoAddPartitionsTask() { + if (addPartitionsTask.isEmpty()) { + return; + } HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); String dbName = firstPartition.getDbName(); @@ -1319,6 +1337,10 @@ public class HMSTransaction implements Transaction { summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime); } + public void pruneAndDeleteStagingDirectories() { + recursiveDeleteItems(new Path(declaredIntentionsToWrite), true); + } + public void doNothing() { // do nothing // only for regression test and unit test to throw exception @@ -1342,6 +1364,7 @@ public class HMSTransaction implements Transaction { public void rollback() { //delete write path + pruneAndDeleteStagingDirectories(); } } @@ -1372,22 +1395,22 @@ public class HMSTransaction implements Transaction { } public void wrapperAsyncRenameWithProfileSummary(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - List<String> fileNames) { + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List<String> fileNames) { FileSystemUtil.asyncRenameFiles( fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames); summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size())); } public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { FileSystemUtil.asyncRenameDir( fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java index 31e56fd6ccc..49d5a12c2bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -22,6 +22,8 @@ package org.apache.doris.nereids.trees.plans.commands.insert; */ public class HiveInsertCommandContext extends InsertCommandContext { private boolean overwrite = false; + private String writePath; + private String queryId; public boolean isOverwrite() { return overwrite; @@ -30,4 +32,20 @@ public class HiveInsertCommandContext extends InsertCommandContext { public void setOverwrite(boolean overwrite) { this.overwrite = overwrite; } + + public String getWritePath() { + return writePath; + } + + public void setWritePath(String writePath) { + this.writePath = writePath; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 16236c340a6..9421af79503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -35,10 +35,12 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionManager; import org.apache.doris.transaction.TransactionStatus; import org.apache.doris.transaction.TransactionType; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -97,6 +99,13 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { @Override protected void beforeExec() { // check params + HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); + Preconditions.checkArgument(insertCtx.isPresent(), "insert context must be present"); + HiveInsertCommandContext ctx = (HiveInsertCommandContext) insertCtx.get(); + TUniqueId tUniqueId = ConnectContext.get().queryId(); + Preconditions.checkArgument(tUniqueId != null, "query id shouldn't be null"); + ctx.setQueryId(DebugUtil.printId(tUniqueId)); + transaction.beginInsertTable(ctx); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 61024345c06..7f326cf2212 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -173,7 +173,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, .setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); } else if (physicalSink instanceof PhysicalHiveTableSink) { HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; - insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx); + insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, + Optional.of(insertCtx.orElse((new HiveInsertCommandContext())))); // set hive query options } else { // TODO: support other table types diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index a4fa1d11cb0..c45bdc1ebc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -141,6 +141,7 @@ public class HiveTableSink extends DataSink { if (insertCtx.isPresent()) { HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); tSink.setOverwrite(context.isOverwrite()); + context.setWritePath(writeTempPath); } tDataSink = new TDataSink(getDataSinkType()); tDataSink.setHiveTableSink(tSink); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index dedc7738c86..ba87dd8f48e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -19,11 +19,14 @@ package org.apache.doris.datasource.hive; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.TestHMSCachedClient; import org.apache.doris.fs.LocalDfsFileSystem; +import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THiveLocationParams; import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUpdateMode; import com.google.common.collect.Lists; @@ -47,7 +50,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; public class HmsCommitTest { @@ -61,6 +68,7 @@ public class HmsCommitTest { static String writeLocation; static String uri = "thrift://127.0.0.1:9083"; static boolean hasRealHmsService = false; + private ConnectContext connectContext; @BeforeClass public static void beforeClass() throws Throwable { @@ -70,10 +78,6 @@ public class HmsCommitTest { writeLocation = "file://" + writePath.toAbsolutePath() + "/"; createTestHiveCatalog(); createTestHiveDatabase(); - - // context - ConnectContext connectContext = new ConnectContext(); - connectContext.setThreadLocalInfo(); } @AfterClass @@ -114,17 +118,21 @@ public class HmsCommitTest { partitionKeys.add("c3"); String fileFormat = "orc"; HiveTableMetadata tableMetadata = new HiveTableMetadata( - dbName, tbWithPartition, Optional.of(dbLocation + tbWithPartition), + dbName, tbWithPartition, Optional.of(dbLocation + tbWithPartition + UUID.randomUUID()), columns, partitionKeys, new HashMap<>(), fileFormat, ""); hmsClient.createTable(tableMetadata, true); // create table for tbWithoutPartition HiveTableMetadata tableMetadata2 = new HiveTableMetadata( - dbName, tbWithoutPartition, Optional.of(dbLocation + tbWithPartition), + dbName, tbWithoutPartition, Optional.of(dbLocation + tbWithPartition + UUID.randomUUID()), columns, new ArrayList<>(), new HashMap<>(), fileFormat, ""); hmsClient.createTable(tableMetadata2, true); + + // context + connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); } @After @@ -142,6 +150,8 @@ public class HmsCommitTest { @Test public void testAppendPartitionForUnPartitionedTable() throws IOException { + genQueryID(); + System.out.println(DebugUtil.printId(connectContext.queryId())); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomAppend(null)); pus.add(createRandomAppend(null)); @@ -150,6 +160,8 @@ public class HmsCommitTest { Table table = hmsClient.getTable(dbName, tbWithoutPartition); assertNumRows(3, table); + genQueryID(); + System.out.println(DebugUtil.printId(connectContext.queryId())); List<THivePartitionUpdate> pus2 = new ArrayList<>(); pus2.add(createRandomAppend(null)); pus2.add(createRandomAppend(null)); @@ -162,6 +174,8 @@ public class HmsCommitTest { @Test public void testOverwritePartitionForUnPartitionedTable() throws IOException { testAppendPartitionForUnPartitionedTable(); + + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomOverwrite(null)); pus.add(createRandomOverwrite(null)); @@ -173,6 +187,7 @@ public class HmsCommitTest { @Test public void testNewPartitionForPartitionedTable() throws IOException { + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomNew("a")); pus.add(createRandomNew("a")); @@ -194,6 +209,7 @@ public class HmsCommitTest { public void testAppendPartitionForPartitionedTable() throws IOException { testNewPartitionForPartitionedTable(); + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomAppend("a")); pus.add(createRandomAppend("a")); @@ -214,6 +230,8 @@ public class HmsCommitTest { @Test public void testOverwritePartitionForPartitionedTable() throws IOException { testAppendPartitionForPartitionedTable(); + + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomOverwrite("a")); pus.add(createRandomOverwrite("b")); @@ -230,6 +248,7 @@ public class HmsCommitTest { @Test public void testNewManyPartitionForPartitionedTable() throws IOException { + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); int nums = 150; for (int i = 0; i < nums; i++) { @@ -242,6 +261,7 @@ public class HmsCommitTest { assertNumRows(1, p); } + genQueryID(); try { commit(dbName, tbWithPartition, Collections.singletonList(createRandomNew("1"))); } catch (Exception e) { @@ -254,6 +274,7 @@ public class HmsCommitTest { // first add three partition: a,b,c testNewPartitionForPartitionedTable(); + genQueryID(); // second append two partition: a,x // but there is no 'x' partition in the previous table, so when verifying based on HMS, // it will throw exception @@ -281,12 +302,17 @@ public class HmsCommitTest { public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) throws IOException { - String uuid = UUID.randomUUID().toString(); + String queryId = ""; + if (connectContext.queryId() != null) { + queryId = DebugUtil.printId(connectContext.queryId()); + } + THiveLocationParams location = new THiveLocationParams(); - String targetPath = dbLocation + uuid + "/" + partitionValue; + String targetPath = dbLocation + queryId + "/" + partitionValue; location.setTargetPath(targetPath); - location.setWritePath(writeLocation + partitionValue); + String writePath = writeLocation + queryId + "/" + partitionValue; + location.setWritePath(writePath); THivePartitionUpdate pu = new THivePartitionUpdate(); if (partitionValue != null) { @@ -296,9 +322,10 @@ public class HmsCommitTest { pu.setRowCount(1); pu.setFileSize(1); pu.setLocation(location); - String f1 = uuid + "f1"; - String f2 = uuid + "f2"; - String f3 = uuid + "f3"; + String uuid = UUID.randomUUID().toString(); + String f1 = queryId + "_" + uuid + "_f1.orc"; + String f2 = queryId + "_" + uuid + "_f2.orc"; + String f3 = queryId + "_" + uuid + "_f3.orc"; pu.setFileNames(new ArrayList<String>() { { @@ -312,9 +339,9 @@ public class HmsCommitTest { fs.makeDir(targetPath); } - fs.createFile(writeLocation + partitionValue + "/" + f1); - fs.createFile(writeLocation + partitionValue + "/" + f2); - fs.createFile(writeLocation + partitionValue + "/" + f3); + fs.createFile(writePath + "/" + f1); + fs.createFile(writePath + "/" + f2); + fs.createFile(writePath + "/" + f3); return pu; } @@ -338,6 +365,11 @@ public class HmsCommitTest { List<THivePartitionUpdate> hivePUs) { HMSTransaction hmsTransaction = new HMSTransaction(hmsOps); hmsTransaction.setHivePartitionUpdates(hivePUs); + HiveInsertCommandContext ctx = new HiveInsertCommandContext(); + String queryId = DebugUtil.printId(ConnectContext.get().queryId()); + ctx.setQueryId(queryId); + ctx.setWritePath(writeLocation + queryId + "/"); + hmsTransaction.beginInsertTable(ctx); hmsTransaction.finishInsertTable(dbName, tableName); hmsTransaction.commit(); } @@ -352,6 +384,21 @@ public class HmsCommitTest { }; } + public void mockAsyncRenameDir(Runnable runnable) { + new MockUp<HMSTransaction>(HMSTransaction.class) { + @Mock + private void wrapperAsyncRenameDirWithProfileSummary(Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + runnable.run(); + throw new RuntimeException("failed to rename dir"); + } + }; + } + public void mockDoOther(Runnable runnable) { new MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) { @Mock @@ -372,8 +419,49 @@ public class HmsCommitTest { }; } + public void genQueryID() { + connectContext.setQueryId(new TUniqueId(new Random().nextInt(), new Random().nextInt())); + } + + @Test + public void testRollbackWritePath() throws IOException { + genQueryID(); + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomNew("a")); + + THiveLocationParams location = pus.get(0).getLocation(); + + // For new partition, there should be no target path + Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + + mockAsyncRenameDir(() -> { + // commit will be failed, and it will remain some files in write path + String writePath = location.getWritePath(); + Assert.assertTrue(fs.exists(writePath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertTrue(fs.exists(writePath + "/" + file).ok()); + } + }); + + try { + commit(dbName, tbWithPartition, pus); + Assert.assertTrue(false); + } catch (Exception e) { + // ignore + } + + // After rollback, these files in write path will be deleted + String writePath = location.getWritePath(); + Assert.assertFalse(fs.exists(writePath).ok()); + for (String file : pus.get(0).getFileNames()) { + Assert.assertFalse(fs.exists(writePath + "/" + file).ok()); + } + } + @Test public void testRollbackNewPartitionForPartitionedTableForFilesystem() throws IOException { + genQueryID(); List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomNew("a")); @@ -413,6 +501,8 @@ public class HmsCommitTest { // first create three partitions: a,b,c testNewPartitionForPartitionedTable(); + genQueryID(); + // second add 'new partition' for 'x' // add 'append partition' for 'a' // when 'doCommit', 'new partition' will be executed before 'append partition' @@ -421,15 +511,16 @@ public class HmsCommitTest { pus.add(createRandomNew("x")); pus.add(createRandomAppend("a")); - THiveLocationParams location = pus.get(0).getLocation(); + THiveLocationParams locationForX = pus.get(0).getLocation(); + THiveLocationParams locationForA = pus.get(0).getLocation(); // For new partition, there should be no target path - Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); - Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + Assert.assertFalse(fs.exists(locationForX.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(locationForX.getWritePath()).ok()); mockUpdateStatisticsTaskException(() -> { // When the commit is completed, these files should be renamed successfully - String targetPath = location.getTargetPath(); + String targetPath = locationForX.getTargetPath(); Assert.assertTrue(fs.exists(targetPath).ok()); for (String file : pus.get(0).getFileNames()) { Assert.assertTrue(fs.exists(targetPath + "/" + file).ok()); @@ -448,11 +539,13 @@ public class HmsCommitTest { } // After rollback, these files will be deleted - String targetPath = location.getTargetPath(); + String targetPath = locationForX.getTargetPath(); Assert.assertFalse(fs.exists(targetPath).ok()); for (String file : pus.get(0).getFileNames()) { Assert.assertFalse(fs.exists(targetPath + "/" + file).ok()); } + Assert.assertFalse(fs.exists(locationForX.getWritePath()).ok()); + Assert.assertFalse(fs.exists(locationForA.getWritePath()).ok()); // x partition will be deleted Assert.assertThrows( "the 'x' partition should be deleted", @@ -466,24 +559,39 @@ public class HmsCommitTest { // first create three partitions: a,b,c testNewPartitionForPartitionedTable(); + genQueryID(); // second add 'new partition' for 'x' // add 'append partition' for 'a' List<THivePartitionUpdate> pus = new ArrayList<>(); pus.add(createRandomNew("x")); pus.add(createRandomAppend("a")); - THiveLocationParams location = pus.get(0).getLocation(); + THiveLocationParams locationForParX = pus.get(0).getLocation(); + // in test, targetPath is a random path + // but when appending a partition, it uses the location of the original partition as the targetPath + // so here we need to get the path of partition a + Partition a = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + String location = a.getSd().getLocation(); + pus.get(1).getLocation().setTargetPath(location); + THiveLocationParams locationForParA = pus.get(1).getLocation(); // For new partition, there should be no target path - Assert.assertFalse(fs.exists(location.getTargetPath()).ok()); - Assert.assertTrue(fs.exists(location.getWritePath()).ok()); + Assert.assertFalse(fs.exists(locationForParX.getTargetPath()).ok()); + Assert.assertTrue(fs.exists(locationForParX.getWritePath()).ok()); + + // For exist partition + Assert.assertTrue(fs.exists(locationForParA.getTargetPath()).ok()); mockDoOther(() -> { // When the commit is completed, these files should be renamed successfully - String targetPath = location.getTargetPath(); - Assert.assertTrue(fs.exists(targetPath).ok()); + String targetPathForX = locationForParX.getTargetPath(); + Assert.assertTrue(fs.exists(targetPathForX).ok()); for (String file : pus.get(0).getFileNames()) { - Assert.assertTrue(fs.exists(targetPath + "/" + file).ok()); + Assert.assertTrue(fs.exists(targetPathForX + "/" + file).ok()); + } + String targetPathForA = locationForParA.getTargetPath(); + for (String file : pus.get(1).getFileNames()) { + Assert.assertTrue(fs.exists(targetPathForA + "/" + file).ok()); } // new partition will be executed, // so, we can get the new partition @@ -503,11 +611,18 @@ public class HmsCommitTest { } // After rollback, these files will be deleted - String targetPath = location.getTargetPath(); - Assert.assertFalse(fs.exists(targetPath).ok()); + String targetPathForX = locationForParX.getTargetPath(); + Assert.assertFalse(fs.exists(targetPathForX).ok()); for (String file : pus.get(0).getFileNames()) { - Assert.assertFalse(fs.exists(targetPath + "/" + file).ok()); + Assert.assertFalse(fs.exists(targetPathForX + "/" + file).ok()); + } + Assert.assertFalse(fs.exists(locationForParX.getWritePath()).ok()); + String targetPathForA = locationForParA.getTargetPath(); + for (String file : pus.get(1).getFileNames()) { + Assert.assertFalse(fs.exists(targetPathForA + "/" + file).ok()); } + Assert.assertTrue(fs.exists(targetPathForA).ok()); + Assert.assertFalse(fs.exists(locationForParA.getWritePath()).ok()); // x partition will be deleted Assert.assertThrows( "the 'x' partition should be deleted", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org