[ 
https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=497325&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-497325
 ]

ASF GitHub Bot logged work on HIVE-21052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/20 13:19
            Start Date: 08/Oct/20 13:19
    Worklog Time Spent: 10m 
      Work Description: pvargacl commented on a change in pull request #1548:
URL: https://github.com/apache/hive/pull/1548#discussion_r501713463



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    HiveStreamingConnection connection1 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+    HiveStreamingConnection connection2 = 
prepareTableTwoPartitionsAndConnection(dbName, tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  @Test
+  public void testCleanAbortCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, 
tblName, 1);
+    HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, 
tblName, 1);
+
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,2".getBytes());
+    connection1.abortTransaction();
+
+    connection2.beginTransaction();
+    connection2.write("1,3".getBytes());
+    connection2.write("2,3".getBytes());
+    connection2.write("3,3".getBytes());
+    connection2.abortTransaction();
+
+    assertAndCompactCleanAbort(dbName, tblName);
+
+    connection1.close();
+    connection2.close();
+  }
+
+  private void assertAndCompactCleanAbort(String dbName, String tblName) 
throws Exception {
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()));
+    if (3 != stat.length) {
+      Assert.fail("Expecting three directories corresponding to three 
partitions, FileStatus[] stat " + Arrays.toString(stat));
+    }
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS where TC_OPERATION_TYPE='p'");
+    // We should have two rows corresponding to the two aborted transactions
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 2, count);
+
+    runInitiator(conf);
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE where CQ_TYPE='p'");
+    // Only one job is added to the queue per table. This job corresponds to 
all the entries for a particular table
+    // with rows in TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 1, count);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+    Assert.assertEquals(CompactionType.CLEAN_ABORTED,
+        rsp.getCompacts().get(0).getType());
+
+    runCleaner(conf);
+
+    // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have 
zero rows, also the folders should have been deleted.
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
TXN_COMPONENTS");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
TXN_COMPONENTS"), 0, count);
+
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPACTION_QUEUE");
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from 
COMPACTION_QUEUE"), 0, count);
+
+    RemoteIterator it =
+        fs.listFiles(new Path(table.getSd().getLocation()), true);
+    if (it.hasNext()) {
+      Assert.fail("Expecting compaction to have cleaned the directories, 
FileStatus[] stat " + Arrays.toString(stat));

Review comment:
       I think this assert is quit misleading. I might be wrong but the 
recursive listing skips empty directories, and actually this new version of 
cleaning will keep the partition directories (it should) and only delete the 
delta dirs and files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 497325)
    Time Spent: 8h  (was: 7h 50m)

> Make sure transactions get cleaned if they are aborted before addPartitions 
> is called
> -------------------------------------------------------------------------------------
>
>                 Key: HIVE-21052
>                 URL: https://issues.apache.org/jira/browse/HIVE-21052
>             Project: Hive
>          Issue Type: Bug
>          Components: Transactions
>    Affects Versions: 3.0.0, 3.1.1
>            Reporter: Jaume M
>            Assignee: Jaume M
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, 
> HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, 
> HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, 
> HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, 
> HIVE-21052.8.patch, HIVE-21052.9.patch
>
>          Time Spent: 8h
>  Remaining Estimate: 0h
>
> If the transaction is aborted between openTxn and addPartitions and data has 
> been written on the table the transaction manager will think it's an empty 
> transaction and no cleaning will be done.
> This is currently an issue in the streaming API and in micromanaged tables. 
> As proposed by [~ekoifman] this can be solved by:
> * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and 
> when addPartitions is called remove this entry from TXN_COMPONENTS and add 
> the corresponding partition entry to TXN_COMPONENTS.
> * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that 
> specifies that a transaction was opened and it was aborted it must generate 
> jobs for the worker for every possible partition available.
> cc [~ewohlstadter]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to