[ https://issues.apache.org/jira/browse/HIVE-23671?focusedWorklogId=452846&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-452846 ]
ASF GitHub Bot logged work on HIVE-23671: ----------------------------------------- Author: ASF GitHub Bot Created on: 30/Jun/20 07:28 Start Date: 30/Jun/20 07:28 Worklog Time Spent: 10m Work Description: deniskuzZ commented on a change in pull request #1087: URL: https://github.com/apache/hive/pull/1087#discussion_r447468545 ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java ########## @@ -229,102 +239,168 @@ public int repair(MsckInfo msckInfo) { throw new MetastoreException(e); } } + if (transactionalTable && !MetaStoreServerUtils.isPartitioned(table)) { + if (result.getMaxWriteId() > 0) { + if (txnId < 0) { + // We need the txnId to check against even if we didn't do the locking + txnId = getMsc().openTxn(getUserName()); + } + + validateAndAddMaxTxnIdAndWriteId(result.getMaxWriteId(), result.getMaxTxnId(), + table.getDbName(), table.getTableName(), txnId); + } + } } success = true; } catch (Exception e) { LOG.warn("Failed to run metacheck: ", e); success = false; - ret = 1; } finally { - if (msckInfo.getResFile() != null) { - BufferedWriter resultOut = null; - try { - Path resFile = new Path(msckInfo.getResFile()); - FileSystem fs = resFile.getFileSystem(getConf()); - resultOut = new BufferedWriter(new OutputStreamWriter(fs.create(resFile))); - - boolean firstWritten = false; - firstWritten |= writeMsckResult(result.getTablesNotInMs(), - "Tables not in metastore:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getTablesNotOnFs(), - "Tables missing on filesystem:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getPartitionsNotInMs(), - "Partitions not in metastore:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), - "Partitions missing from filesystem:", resultOut, firstWritten); - firstWritten |= writeMsckResult(result.getExpiredPartitions(), - "Expired partitions (retention period: " + partitionExpirySeconds + "s) :", resultOut, firstWritten); - // sorting to stabilize qfile output (msck_repair_drop.q) - Collections.sort(repairOutput); - for (String rout : repairOutput) { - if (firstWritten) { - resultOut.write(terminator); - } else { - firstWritten = true; - } - resultOut.write(rout); - } - } catch (IOException e) { - LOG.warn("Failed to save metacheck output: ", e); - ret = 1; - } finally { - if (resultOut != null) { - try { - resultOut.close(); - } catch (IOException e) { - LOG.warn("Failed to close output file: ", e); - ret = 1; - } - } + if (result!=null) { + logResult(result); + if (msckInfo.getResFile() != null) { + success = writeResultToFile(msckInfo, result, repairOutput, partitionExpirySeconds) && success; } } - LOG.info("Tables not in metastore: {}", result.getTablesNotInMs()); - LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs()); - LOG.info("Partitions not in metastore: {}", result.getPartitionsNotInMs()); - LOG.info("Partitions missing from filesystem: {}", result.getPartitionsNotOnFs()); - LOG.info("Expired partitions: {}", result.getExpiredPartitions()); - if (acquireLock && txnId > 0) { - if (success) { - try { - LOG.info("txnId: {} succeeded. Committing..", txnId); - getMsc().commitTxn(txnId); - } catch (Exception e) { - LOG.warn("Error while committing txnId: {} for table: {}", txnId, qualifiedTableName, e); - ret = 1; - } - } else { - try { - LOG.info("txnId: {} failed. Aborting..", txnId); - getMsc().abortTxns(Lists.newArrayList(txnId)); - } catch (Exception e) { - LOG.warn("Error while aborting txnId: {} for table: {}", txnId, qualifiedTableName, e); - ret = 1; - } - } + if (txnId > 0) { + success = closeTxn(qualifiedTableName, success, txnId) && success; } if (getMsc() != null) { getMsc().close(); msc = null; } } + return success ? 0 : 1; + } + private boolean closeTxn(String qualifiedTableName, boolean success, long txnId) { + boolean ret = true; + if (success) { + try { + LOG.info("txnId: {} succeeded. Committing..", txnId); + getMsc().commitTxn(txnId); + } catch (Exception e) { + LOG.warn("Error while committing txnId: {} for table: {}", txnId, qualifiedTableName, e); + ret = false; + } + } else { + try { + LOG.info("txnId: {} failed. Aborting..", txnId); + getMsc().abortTxns(Lists.newArrayList(txnId)); + } catch (Exception e) { + LOG.warn("Error while aborting txnId: {} for table: {}", txnId, qualifiedTableName, e); + ret = false; + } + } return ret; } - private LockRequest createLockRequest(final String dbName, final String tableName) throws TException { - UserGroupInformation loggedInUser = null; - String username; + private void logResult(CheckResult result) { + LOG.info("Tables not in metastore: {}", result.getTablesNotInMs()); + LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs()); + LOG.info("Partitions not in metastore: {}", result.getPartitionsNotInMs()); + LOG.info("Partitions missing from filesystem: {}", result.getPartitionsNotOnFs()); + LOG.info("Expired partitions: {}", result.getExpiredPartitions()); + } + + private boolean writeResultToFile(MsckInfo msckInfo, CheckResult result, List<String> repairOutput, + long partitionExpirySeconds) { + boolean success = true; + BufferedWriter resultOut = null; try { - loggedInUser = UserGroupInformation.getLoginUser(); + Path resFile = new Path(msckInfo.getResFile()); + FileSystem fs = resFile.getFileSystem(getConf()); + resultOut = new BufferedWriter(new OutputStreamWriter(fs.create(resFile))); + + boolean firstWritten = false; + firstWritten |= writeMsckResult(result.getTablesNotInMs(), + "Tables not in metastore:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getTablesNotOnFs(), + "Tables missing on filesystem:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getPartitionsNotInMs(), + "Partitions not in metastore:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(), + "Partitions missing from filesystem:", resultOut, firstWritten); + firstWritten |= writeMsckResult(result.getExpiredPartitions(), + "Expired partitions (retention period: " + partitionExpirySeconds + "s) :", resultOut, firstWritten); + // sorting to stabilize qfile output (msck_repair_drop.q) + Collections.sort(repairOutput); + for (String rout : repairOutput) { + if (firstWritten) { + resultOut.write(terminator); + } else { + firstWritten = true; + } + resultOut.write(rout); + } } catch (IOException e) { - LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage()); + LOG.warn("Failed to save metacheck output: ", e); + success = false; + } finally { + if (resultOut != null) { + try { + resultOut.close(); + } catch (IOException e) { + LOG.warn("Failed to close output file: ", e); + success = false; + } + } } - if (loggedInUser == null) { - username = System.getProperty("user.name"); - } else { - username = loggedInUser.getShortUserName(); + return success; + } + + /** + * When we add new partitions to a transactional table, we have check the writeIds. + * For every newly added partitions, we read the maximum writeId form the directory structure + * and compare it to the maximum allocated writeId in the metastore. + * If the metastore has never allocated any were are good, the use case would be initialize a table with + * existing data. The HMS will be initialized with the maximum writeId. The system will handle every delta directory + * as committed ones. + * If the writeId is higher in the metastore we can still accept the data, the use case would be after some dataloss + * some older data backup was used. The system would able to read the old data. + * If however the writeId in the new partition is greater than the maximum allocated in the HMS + * we must raise an error. The writedId in the HMS should be increased to match the writeIds in the data files, + * but it would most likely cause a lot of problem since the transactional data would become inconsistent + * between the HMS and the filesystem. + * Further more we need to check for the visibilityTransactionIds written by the compaction. + * If we have a higher visibilityId in the directory structure than the current transactionid we need to set + * the transactionId sequence higher in the HMS so the next reads may read the content of the + * compacted base/delta folders. + * @param partsNotInMs partitions only in the FileSystem + * @param dbName database name + * @param tableName table name + * @param txnId actual transactionId + */ + private void validateAndAddMaxTxnIdAndWriteId(Set<CheckResult.PartitionResult> partsNotInMs, String dbName, + String tableName, long txnId) throws TException { + long maxWriteIdOnFilesystem = + partsNotInMs.stream().map(CheckResult.PartitionResult::getMaxWriteId).max(Long::compareTo).orElse(0L); + long maxVisibilityTxnId = + partsNotInMs.stream().map(CheckResult.PartitionResult::getMaxTxnId).max(Long::compareTo).orElse(0L); + validateAndAddMaxTxnIdAndWriteId(maxWriteIdOnFilesystem, maxVisibilityTxnId, dbName, tableName, txnId); + } + + private void validateAndAddMaxTxnIdAndWriteId(long maxWriteIdOnFilesystem, long maxVisibilityTxnId, String dbName, + String tableName, long txnId) throws TException { + long maxAllocatedWriteId = getMsc().getMaxAllocatedWriteId(dbName, tableName); + if (maxAllocatedWriteId > 0 && maxWriteIdOnFilesystem > maxAllocatedWriteId) { Review comment: what if we would like to ship FS deltas to the backup HMS due to some replication issue? could you please elaborate what issues do you see in increasing writeId in HMS to match FS? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 452846) Time Spent: 5h (was: 4h 50m) > MSCK repair should handle transactional tables in certain usecases > ------------------------------------------------------------------ > > Key: HIVE-23671 > URL: https://issues.apache.org/jira/browse/HIVE-23671 > Project: Hive > Issue Type: Improvement > Components: Metastore > Reporter: Peter Varga > Assignee: Peter Varga > Priority: Major > Labels: pull-request-available > Time Spent: 5h > Remaining Estimate: 0h > > The MSCK REPAIR tool does not handle transactional tables too well. It can > find and add new partitions the same way as for non-transactional tables, but > since the writeId differences are not handled, the data can not read back > from the new partitions. > We could handle some usecases when the writeIds in the HMS and the underlying > data are not conflicting. If the HMS does not contains allocated writes for > the table we can seed the table with the writeIds read from the directory > structrure. > Real life use cases could be: > * Copy data files from one cluster to another with different HMS, create the > table and call MSCK REPAIR > * If the HMS db is lost, recreate the table and call MSCK REPAIR > -- This message was sent by Atlassian Jira (v8.3.4#803005)