[ 
https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=498499&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498499
 ]

ASF GitHub Bot logged work on HIVE-21052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/20 14:12
            Start Date: 09/Oct/20 14:12
    Worklog Time Spent: 10m 
      Work Description: pvargacl commented on a change in pull request #1548:
URL: https://github.com/apache/hive/pull/1548#discussion_r501713463



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection1 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+    HiveStreamingConnection connection2 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  private void assertAndCompactCleanAbort(String dbName, String tblName) 
throws Exception {
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()));
+    if (3 != stat.length) {
+      Assert.fail("Expecting three directories corresponding to three 
partitions, FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 1, count);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));

Review comment:
       I think this assert is quit misleading. I might be wrong but the 
recursive listing skips empty directories, and actually this new version of 
cleaning will keep the partition directories (it should) and only delete the 
delta dirs and files.

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection1 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+    HiveStreamingConnection connection2 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  private void assertAndCompactCleanAbort(String dbName, String tblName) 
throws Exception {
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()));
+    if (3 != stat.length) {
+      Assert.fail("Expecting three directories corresponding to three 
partitions, FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 1, count);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+  }
+
+  @Test
+  public void testCleanAbortCompactSeveralTables() throws Exception {
+    String dbName = "default";
+    String tblName1 = "cws1";
+    String tblName2 = "cws2";
+
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName1, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName2, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,1".getBytes());
+    connection2.write("2,2".getBytes());
+    connection2.abortTransaction();
+
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    FileSystem fs = FileSystem.get(conf);
+    Table table1 = msClient.getTable(dbName, tblName1);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table1.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+    Table table2 = msClient.getTable(dbName, tblName2);
+    stat = fs.listStatus(new Path(table2.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 2, count);
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table1.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCorrectlyCleaned() throws Exception {
+    // Test that at commit the tables are cleaned properly
+    String dbName = "default";
+    String tblName = "cws";
+    HiveStreamingConnection connection = prepareTableAndConnection(dbName, 
tblName, 1);
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,2".getBytes());
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions

Review comment:
       this comment is copied I guess

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection1 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+    HiveStreamingConnection connection2 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  private void assertAndCompactCleanAbort(String dbName, String tblName) 
throws Exception {
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()));
+    if (3 != stat.length) {
+      Assert.fail("Expecting three directories corresponding to three 
partitions, FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 1, count);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+  }
+
+  @Test
+  public void testCleanAbortCompactSeveralTables() throws Exception {
+    String dbName = "default";
+    String tblName1 = "cws1";
+    String tblName2 = "cws2";
+
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName1, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName2, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,1".getBytes());
+    connection2.write("2,2".getBytes());
+    connection2.abortTransaction();
+
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    FileSystem fs = FileSystem.get(conf);
+    Table table1 = msClient.getTable(dbName, tblName1);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table1.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+    Table table2 = msClient.getTable(dbName, tblName2);
+    stat = fs.listStatus(new Path(table2.getSd().getLocation()));
+    if (2 != stat.length) {
+      Assert.fail("Expecting two directories corresponding to two partitions, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 2, count);
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table1.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCorrectlyCleaned() throws Exception {
+    // Test that at commit the tables are cleaned properly
+    String dbName = "default";
+    String tblName = "cws";
+    HiveStreamingConnection connection = prepareTableAndConnection(dbName, 
tblName, 1);
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,2".getBytes());
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 1, count);
+
+    connection.commitTransaction();
+
+    // After commit the row should have been deleted
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+  }
+
+  @Test
+  public void testCleanAbortAndMinorCompact() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,2".getBytes());
+    connection.abortTransaction();
+
+    executeStatementOnDriver("insert into " + tblName + " partition (a) values 
(1, '1')", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b=1", driver);
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    runInitiator(conf);
+    runWorker(conf);
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 2, count);
+    // Cleaning should happen in threads concurrently for the minor compaction 
and the clean abort one.
+    runCleaner(conf);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+  }
+
+  private HiveStreamingConnection prepareTableAndConnection(String dbName, 
String tblName, int batchSize) throws Exception {

Review comment:
       Could you add two more test with batch size 2. first batch writes to p1 
and p2 and commits, second batch writes to p2 and p3 and aborts. And one with 
the aborted / committed order changed.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -97,9 +100,9 @@ public void run() {
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
           LOG.info("Cleaning based on min open txn id: " + minOpenTxnId);
           List<CompletableFuture> cleanerList = new ArrayList<>();
-          for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+          for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
             
cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(()
 ->
-                    clean(compactionInfo, minOpenTxnId)), cleanerExecutor));
+                  clean(compactionInfo, minOpenTxnId)), cleanerExecutor));

Review comment:
       I agree, it can be addressed in a follow up Jira.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -232,6 +240,51 @@ public Object run() throws Exception {
   private static String idWatermark(CompactionInfo ci) {
     return " id=" + ci.id;
   }
+
+  private void cleanAborted(CompactionInfo ci) throws MetaException {
+    if (ci.writeIds == null || ci.writeIds.size() == 0) {
+      LOG.warn("Attempted cleaning aborted transaction with empty writeId 
list");

Review comment:
       Shouldn't you mark the compaction failed or cleaned?

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
##########
@@ -107,11 +107,12 @@ public CompactionTxnHandler() {
         // Check for aborted txns: number of aborted txns past threshold and 
age of aborted txns
         // past time threshold
         boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
-        final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", 
\"TC_PARTITION\","
-            + "MIN(\"TXN_STARTED\"), COUNT(*)"
+        String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", 
\"TC_PARTITION\", "
+            + "MIN(\"TXN_STARTED\"), COUNT(*), "
+            + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART 
+ " THEN 1 ELSE 0 END) AS \"IS_DP\" "

Review comment:
       Previously if you had aborted txn above threshold this would generate a 
"normal" compaction that would clean up everything. However now if you have one 
dynpart aborted the type will be CLEAN_ABORTED that will only clean the 
writeids belonging to p-type records and leave everything else. This will delay 
the normal cleaning. I am not sure that is a problem or not. 

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
##########
@@ -414,77 +436,56 @@ public void markCleaned(CompactionInfo info) throws 
MetaException {
          * aborted TXN_COMPONENTS above tc_writeid (and consequently about 
aborted txns).
          * See {@link ql.txn.compactor.Cleaner.removeFiles()}
          */
-        s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" 
WHERE \"TXN_ID\" = \"TC_TXNID\" "
-            + "AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " AND 
\"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?";
-        if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?";
-        if (info.partName != null) s += " AND \"TC_PARTITION\" = ?";
+        List<String> queries = new ArrayList<>();
+        Iterator<Long> writeIdsIter = null;
+        List<Integer> counts = null;
 
-        pStmt = dbConn.prepareStatement(s);
-        paramCount = 1;
-        pStmt.setString(paramCount++, info.dbname);
-        pStmt.setString(paramCount++, info.tableName);
-        if(info.highestWriteId != 0) {
-          pStmt.setLong(paramCount++, info.highestWriteId);
+        s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN (" +
+          "   SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.ABORTED + ") " +

Review comment:
       cool




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 498499)
    Time Spent: 10h 50m  (was: 10h 40m)

> Make sure transactions get cleaned if they are aborted before addPartitions 
> is called
> -------------------------------------------------------------------------------------
>
>                 Key: HIVE-21052
>                 URL: https://issues.apache.org/jira/browse/HIVE-21052
>             Project: Hive
>          Issue Type: Bug
>          Components: Transactions
>    Affects Versions: 3.0.0, 3.1.1
>            Reporter: Jaume M
>            Assignee: Jaume M
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, 
> HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, 
> HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, 
> HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, 
> HIVE-21052.8.patch, HIVE-21052.9.patch
>
>          Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> If the transaction is aborted between openTxn and addPartitions and data has 
> been written on the table the transaction manager will think it's an empty 
> transaction and no cleaning will be done.
> This is currently an issue in the streaming API and in micromanaged tables. 
> As proposed by [~ekoifman] this can be solved by:
> * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and 
> when addPartitions is called remove this entry from TXN_COMPONENTS and add 
> the corresponding partition entry to TXN_COMPONENTS.
> * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that 
> specifies that a transaction was opened and it was aborted it must generate 
> jobs for the worker for every possible partition available.
> cc [~ewohlstadter]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to