[ https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=498134&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498134 ]
ASF GitHub Bot logged work on HIVE-21052: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/Oct/20 13:42 Start Date: 09/Oct/20 13:42 Worklog Time Spent: 10m Work Description: deniskuzZ commented on a change in pull request #1548: URL: https://github.com/apache/hive/pull/1548#discussion_r499754423 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ########## @@ -2839,6 +2848,87 @@ public static void setNonTransactional(Map<String, String> tblProps) { tblProps.remove(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); } + /** + * Look for delta directories matching the list of writeIds and deletes them. + * @param rootPartition root partition to look for the delta directories + * @param conf configuration + * @param writeIds list of writeIds to look for in the delta directories + * @return list of deleted directories. + * @throws IOException + */ + public static List<FileStatus> deleteDeltaDirectories(Path rootPartition, Configuration conf, Set<Long> writeIds) + throws IOException { + FileSystem fs = rootPartition.getFileSystem(conf); + + PathFilter filter = (p) -> { + String name = p.getName(); + for (Long wId : writeIds) { + if (name.startsWith(deltaSubdir(wId, wId)) && !name.contains("=")) { Review comment: changed, included delete_delta as well ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed. @vpnvishv is that correct? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed. @vpnvishv is that correct? Also we do not allow concurrent Cleaners, their execution is mutexed. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: 1. In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed. @vpnvishv is that correct? Also we do not allow concurrent Cleaners, their execution is mutexed. 2. was related to the following issue based on Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() design: "Suppose you have p-type clean on table T that is running (i.e. has the Write lock) and you have 30 different partition clean requests (in T). The 30 per partition cleans will get blocked but they will tie up every thread in the pool while they are blocked, right? If so, no other clean (on any other table) will actually make progress until the p-type on T is done." I think, it's not valid now. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: 1. In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed: https://github.com/apache/hive/pull/1548/files#diff-9cf3ae764b7a33b568a984d695aff837R328 @vpnvishv is that correct? Also we do not allow concurrent Cleaners, their execution is mutexed. 2. was related to the following issue based on Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() design: "Suppose you have p-type clean on table T that is running (i.e. has the Write lock) and you have 30 different partition clean requests (in T). The 30 per partition cleans will get blocked but they will tie up every thread in the pool while they are blocked, right? If so, no other clean (on any other table) will actually make progress until the p-type on T is done." I think, it's not valid now. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: 1. In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed: https://github.com/apache/hive/pull/1548/files#diff-9cf3ae764b7a33b568a984d695aff837R328 @vpnvishv is that correct? Also we do not allow concurrent Cleaners, their execution is mutexed. 2. was related to the following issue based on Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() design: "Suppose you have p-type clean on table T that is running (i.e. has the Write lock) and you have 30 different partition clean requests (in T). The 30 per partition cleans will get blocked but they will tie up every thread in the pool while they are blocked, right? If so, no other clean (on any other table) will actually make progress until the p-type on T is done." Yes, it's still the case that we'll have to wait for all tasks to complete and if there is one slow task, we won't be able to submit new tasks. However not sure if it's a critical issue. I think, we can address it in a separate jira. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -97,9 +100,9 @@ public void run() { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); LOG.info("Cleaning based on min open txn id: " + minOpenTxnId); List<CompletableFuture> cleanerList = new ArrayList<>(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + for (CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> - clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); + clean(compactionInfo, minOpenTxnId)), cleanerExecutor)); Review comment: 1. In original patch Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() was used to prevent a concurrent p-clean (where the whole table will be scanned). I think, that is resolved by grouping p-cleans and recording list of writeIds that needs to be removed: https://github.com/apache/hive/pull/1548/files#diff-9cf3ae764b7a33b568a984d695aff837R328 @vpnvishv is that correct? Also we do not allow concurrent Cleaners, their execution is mutexed. 2. was related to the following issue based on Map<String, NonReentrantReadWriteLock> tableLock = new ConcurrentHashMap<>() design: "Suppose you have p-type clean on table T that is running (i.e. has the Write lock) and you have 30 different partition clean requests (in T). The 30 per partition cleans will get blocked but they will tie up every thread in the pool while they are blocked, right? If so, no other clean (on any other table) will actually make progress until the p-type on T is done." Yes, it's still the case that we'll have to wait for all tasks to complete and if there is one long-running task, we won't be able to submit new ones. However not sure if it's a critical issue. I think, we can address it in a separate jira. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -232,6 +240,51 @@ public Object run() throws Exception { private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } + + private void cleanAborted(CompactionInfo ci) throws MetaException { + if (ci.writeIds == null || ci.writeIds.size() == 0) { + LOG.warn("Attempted cleaning aborted transaction with empty writeId list"); Review comment: yep, good catch! ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + } + + @Test + public void testCleanAbortCompactSeveralTables() throws Exception { + String dbName = "default"; + String tblName1 = "cws1"; + String tblName2 = "cws2"; + + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,1".getBytes()); + connection2.write("2,2".getBytes()); + connection2.abortTransaction(); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + FileSystem fs = FileSystem.get(conf); + Table table1 = msClient.getTable(dbName, tblName1); + FileStatus[] stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + Table table2 = msClient.getTable(dbName, tblName2); + stat = fs.listStatus(new Path(table2.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table1.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCorrectlyCleaned() throws Exception { + // Test that at commit the tables are cleaned properly + String dbName = "default"; + String tblName = "cws"; + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions Review comment: fixed ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); Review comment: yes, recursive listing skips empty directories - that's was done intentionally. changed the comment ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -853,6 +857,273 @@ public void majorCompactAfterAbort() throws Exception { Lists.newArrayList(5, 6), 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + } + + @Test + public void testCleanAbortCompactSeveralTables() throws Exception { + String dbName = "default"; + String tblName1 = "cws1"; + String tblName2 = "cws2"; + + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,1".getBytes()); + connection2.write("2,2".getBytes()); + connection2.abortTransaction(); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + FileSystem fs = FileSystem.get(conf); + Table table1 = msClient.getTable(dbName, tblName1); + FileStatus[] stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + Table table2 = msClient.getTable(dbName, tblName2); + stat = fs.listStatus(new Path(table2.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + RemoteIterator it = + fs.listFiles(new Path(table1.getSd().getLocation()), true); + if (it.hasNext()) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCorrectlyCleaned() throws Exception { + // Test that at commit the tables are cleaned properly + String dbName = "default"; + String tblName = "cws"; + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 1, count); + + connection.commitTransaction(); + + // After commit the row should have been deleted + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + } + + @Test + public void testCleanAbortAndMinorCompact() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + connection.abortTransaction(); + + executeStatementOnDriver("insert into " + tblName + " partition (a) values (1, '1')", driver); + executeStatementOnDriver("delete from " + tblName + " where b=1", driver); + + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + runInitiator(conf); + runWorker(conf); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + // Cleaning should happen in threads concurrently for the minor compaction and the clean abort one. + runCleaner(conf); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + } + + private HiveStreamingConnection prepareTableAndConnection(String dbName, String tblName, int batchSize) throws Exception { Review comment: added ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -232,6 +240,51 @@ public Object run() throws Exception { private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } + + private void cleanAborted(CompactionInfo ci) throws MetaException { + if (ci.writeIds == null || ci.writeIds.size() == 0) { + LOG.warn("Attempted cleaning aborted transaction with empty writeId list"); Review comment: fixed ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -107,11 +107,12 @@ public CompactionTxnHandler() { // Check for aborted txns: number of aborted txns past threshold and age of aborted txns // past time threshold boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," - + "MIN(\"TXN_STARTED\"), COUNT(*)" + String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " + + "MIN(\"TXN_STARTED\"), COUNT(*), " + + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART + " THEN 1 ELSE 0 END) AS \"IS_DP\" " Review comment: I still don't follow. Aborted txn check is done per db/table/partition, so if you have db1/tbl1/{p1-p100}/type=NOT_DP and db1/tbl1/null/type=DP - that should generate 2 entries in potential compactions. ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -107,11 +107,12 @@ public CompactionTxnHandler() { // Check for aborted txns: number of aborted txns past threshold and age of aborted txns // past time threshold boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," - + "MIN(\"TXN_STARTED\"), COUNT(*)" + String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " + + "MIN(\"TXN_STARTED\"), COUNT(*), " + + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART + " THEN 1 ELSE 0 END) AS \"IS_DP\" " Review comment: oh, sorry, I only considered time based threshold ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -107,11 +107,12 @@ public CompactionTxnHandler() { // Check for aborted txns: number of aborted txns past threshold and age of aborted txns // past time threshold boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," - + "MIN(\"TXN_STARTED\"), COUNT(*)" + String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " + + "MIN(\"TXN_STARTED\"), COUNT(*), " + + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART + " THEN 1 ELSE 0 END) AS \"IS_DP\" " Review comment: oh, sorry, I only considered time based threshold for DYN_PART ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ########## @@ -107,11 +107,12 @@ public CompactionTxnHandler() { // Check for aborted txns: number of aborted txns past threshold and age of aborted txns // past time threshold boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," - + "MIN(\"TXN_STARTED\"), COUNT(*)" + String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " + + "MIN(\"TXN_STARTED\"), COUNT(*), " + + "MAX(CASE WHEN \"TC_OPERATION_TYPE\" = " + OperationType.DYNPART + " THEN 1 ELSE 0 END) AS \"IS_DP\" " Review comment: I still don't follow. Aborted txn check is done per db/table/partition, so if you have db1/tbl1/p1/type=NOT_DP and db1/tbl1/null/type=DP - that should generate 2 entries in potential compactions. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 498134) Time Spent: 10.5h (was: 10h 20m) > Make sure transactions get cleaned if they are aborted before addPartitions > is called > ------------------------------------------------------------------------------------- > > Key: HIVE-21052 > URL: https://issues.apache.org/jira/browse/HIVE-21052 > Project: Hive > Issue Type: Bug > Components: Transactions > Affects Versions: 3.0.0, 3.1.1 > Reporter: Jaume M > Assignee: Jaume M > Priority: Critical > Labels: pull-request-available > Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, > HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, > HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, > HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, > HIVE-21052.8.patch, HIVE-21052.9.patch > > Time Spent: 10.5h > Remaining Estimate: 0h > > If the transaction is aborted between openTxn and addPartitions and data has > been written on the table the transaction manager will think it's an empty > transaction and no cleaning will be done. > This is currently an issue in the streaming API and in micromanaged tables. > As proposed by [~ekoifman] this can be solved by: > * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and > when addPartitions is called remove this entry from TXN_COMPONENTS and add > the corresponding partition entry to TXN_COMPONENTS. > * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that > specifies that a transaction was opened and it was aborted it must generate > jobs for the worker for every possible partition available. > cc [~ewohlstadter] -- This message was sent by Atlassian Jira (v8.3.4#803005)