[ https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=498499&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498499 ]
ASF GitHub Bot logged work on HIVE-21052: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/Oct/20 14:12 Start Date: 09/Oct/20 14:12 Worklog Time Spent: 10m Work Description: pvargacl commented on a change in pull request #1548: URL: https://github.com/apache/hive/pull/1548#discussion_r501713463 ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); Review comment: I think this assert is quit misleading. I might be wrong but the recursive listing skips empty directories, and actually this new version of cleaning will keep the partition directories (it should) and only delete the delta dirs and files. ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + } + + @Test + public void testCleanAbortCompactSeveralTables() throws Exception { + String dbName = "default"; + String tblName1 = "cws1"; + String tblName2 = "cws2"; + + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,1".getBytes()); + connection2.write("2,2".getBytes()); + connection2.abortTransaction(); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + FileSystem fs = FileSystem.get(conf); + Table table1 = msClient.getTable(dbName, tblName1); + FileStatus[] stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + Table table2 = msClient.getTable(dbName, tblName2); + stat = fs.listStatus(new Path(table2.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table1.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCorrectlyCleaned() throws Exception { + // Test that at commit the tables are cleaned properly + String dbName = "default"; + String tblName = "cws"; + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions Review comment: this comment is copied I guess ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + } + + @Test + public void testCleanAbortCompactSeveralTables() throws Exception { + String dbName = "default"; + String tblName1 = "cws1"; + String tblName2 = "cws2"; + + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,1".getBytes()); + connection2.write("2,2".getBytes()); + connection2.abortTransaction(); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + FileSystem fs = FileSystem.get(conf); + Table table1 = msClient.getTable(dbName, tblName1); + FileStatus[] stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + Table table2 = msClient.getTable(dbName, tblName2); + stat = fs.listStatus(new Path(table2.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table1.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCorrectlyCleaned() throws Exception { + // Test that at commit the tables are cleaned properly + String dbName = "default"; + String tblName = "cws"; + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 1, count); + + connection.commitTransaction(); + + // After commit the row should have been deleted + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + } + + @Test + public void testCleanAbortAndMinorCompact() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + connection.abortTransaction(); + + executeStatementOnDriver("insert into " + tblName + " partition (a) values (1, '1')", driver); + executeStatementOnDriver("delete from " + tblName + " where b=1", driver); + + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + runInitiator(conf); + runWorker(conf); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + // Cleaning should happen in threads concurrently for the minor compaction and the clean abort one. + runCleaner(conf); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + } + + private HiveStreamingConnection prepareTableAndConnection(String dbName, String tblName, int batchSize) throws Exception { Review comment: Could you add two more test with batch size 2. first batch writes to p1 and p2 and commits, second batch writes to p2 and p3 and aborts. And one with the aborted / committed order changed. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: I agree, it can be addressed in a follow up Jira. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -232,6 +240,51 @@ public Object run() throws Exception { private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } + + private void cleanAborted(CompactionInfo ci) throws MetaException { + if (ci.writeIds == null || ci.writeIds.size() == 0) { + LOG.warn("Attempted cleaning aborted transaction with empty writeId list"); Review comment: Shouldn't you mark the compaction failed or cleaned? ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -107,11 +107,12 @@ public CompactionTxnHandler() { // Check for aborted txns: number of aborted txns past threshold and age of aborted txns // past time threshold boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," - + "MIN(\"TXN_STARTED\"), COUNT(*)" + String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " + + "MIN(\"TXN_STARTED\"), COUNT(*), " + + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART + " THEN 1 ELSE 0 END) AS \"IS_DP\" " Review comment: Previously if you had aborted txn above threshold this would generate a "normal" compaction that would clean up everything. However now if you have one dynpart aborted the type will be CLEAN_ABORTED that will only clean the writeids belonging to p-type records and leave everything else. This will delay the normal cleaning. I am not sure that is a problem or not. ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -414,77 +436,56 @@ public void markCleaned(CompactionInfo info) throws MetaException { * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ - s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " - + "AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; - if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; - if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; + List<String> queries = new ArrayList<>(); + Iterator<Long> writeIdsIter = null; + List<Integer> counts = null; - pStmt = dbConn.prepareStatement(s); - paramCount = 1; - pStmt.setString(paramCount++, info.dbname); - pStmt.setString(paramCount++, info.tableName); - if(info.highestWriteId != 0) { - pStmt.setLong(paramCount++, info.highestWriteId); + s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN (" + + " SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + ") " + Review comment: cool ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 498499) Time Spent: 10h 50m (was: 10h 40m) > Make sure transactions get cleaned if they are aborted before addPartitions > is called > ------------------------------------------------------------------------------------- > > Key: HIVE-21052 > URL: https://issues.apache.org/jira/browse/HIVE-21052 > Project: Hive > Issue Type: Bug > Components: Transactions > Affects Versions: 3.0.0, 3.1.1 > Reporter: Jaume M > Assignee: Jaume M > Priority: Critical > Labels: pull-request-available > Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, > HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, > HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, > HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, > HIVE-21052.8.patch, HIVE-21052.9.patch > > Time Spent: 10h 50m > Remaining Estimate: 0h > > If the transaction is aborted between openTxn and addPartitions and data has > been written on the table the transaction manager will think it's an empty > transaction and no cleaning will be done. > This is currently an issue in the streaming API and in micromanaged tables. > As proposed by [~ekoifman] this can be solved by: > * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and > when addPartitions is called remove this entry from TXN_COMPONENTS and add > the corresponding partition entry to TXN_COMPONENTS. > * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that > specifies that a transaction was opened and it was aborted it must generate > jobs for the worker for every possible partition available. > cc [~ewohlstadter] -- This message was sent by Atlassian Jira (v8.3.4#803005)