[ https://issues.apache.org/jira/browse/HIVE-26267?focusedWorklogId=777956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777956 ]
ASF GitHub Bot logged work on HIVE-26267: ----------------------------------------- Author: ASF GitHub Bot Created on: 03/Jun/22 07:45 Start Date: 03/Jun/22 07:45 Worklog Time Spent: 10m Work Description: deniskuzZ commented on code in PR #3325: URL: https://github.com/apache/hive/pull/3325#discussion_r888700909 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -3701,122 +3698,127 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { * compactions for any resource. */ handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name()); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - long id = generateCompactionQueueId(stmt); - - GetValidWriteIdsRequest request = new GetValidWriteIdsRequest( - Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename()))); - final ValidCompactorWriteIdList tblValidWriteIds = - TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED)) { + try (Statement stmt = dbConn.createStatement()) { + + long id = generateCompactionQueueId(stmt); + + GetValidWriteIdsRequest request = new GetValidWriteIdsRequest( + Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename()))); + final ValidCompactorWriteIdList tblValidWriteIds = + TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0)); + LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + + StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE"). + append(" (\"CQ_STATE\" IN("). + append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)). + append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)). + append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))"). + append(" AND \"CQ_DATABASE\"=?"). + append(" AND \"CQ_TABLE\"=?").append(" AND "); + if(rqst.getPartitionname() == null) { + sb.append("\"CQ_PARTITION\" is null"); + } else { + sb.append("\"CQ_PARTITION\"=?"); + } - List<String> params = new ArrayList<>(); - StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE"). - append(" (\"CQ_STATE\" IN("). - append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)). - append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)). - append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))"). - append(" AND \"CQ_DATABASE\"=?"). - append(" AND \"CQ_TABLE\"=?").append(" AND "); - params.add(Long.toString(tblValidWriteIds.getHighWatermark())); - params.add(rqst.getDbname()); - params.add(rqst.getTablename()); - if(rqst.getPartitionname() == null) { - sb.append("\"CQ_PARTITION\" is null"); - } else { - sb.append("\"CQ_PARTITION\"=?"); - params.add(rqst.getPartitionname()); - } + try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(sb.toString()))) { + pst.setLong(1, tblValidWriteIds.getHighWatermark()); + pst.setString(2, rqst.getDbname()); + pst.setString(3, rqst.getTablename()); + if (rqst.getPartitionname() != null) { + pst.setString(4, rqst.getPartitionname()); + } + LOG.debug("Going to execute query <" + sb + ">"); + try (ResultSet rs = pst.executeQuery()) { + if(rs.next()) { + long enqueuedId = rs.getLong(1); + String state = compactorStateToResponse(rs.getString(2).charAt(0)); + LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() + + "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) + + " with id=" + enqueuedId); + CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false); + resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) + + " and id=" + enqueuedId); + return resp; + } + } + } + List<String> params = new ArrayList<>(); + StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " + + "\"CQ_TABLE\", "); + String partName = rqst.getPartitionname(); + if (partName != null) buf.append("\"CQ_PARTITION\", "); + buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\""); + if (rqst.getProperties() != null) { + buf.append(", \"CQ_TBLPROPERTIES\""); + } + if (rqst.getRunas() != null) { + buf.append(", \"CQ_RUN_AS\""); + } + if (rqst.getInitiatorId() != null) { + buf.append(", \"CQ_INITIATOR_ID\""); + } + if (rqst.getInitiatorVersion() != null) { + buf.append(", \"CQ_INITIATOR_VERSION\""); + } + buf.append(") values ("); + buf.append(id); + buf.append(", ?"); + buf.append(", ?"); + buf.append(", "); + params.add(rqst.getDbname()); + params.add(rqst.getTablename()); + if (partName != null) { + buf.append("?, '"); + params.add(partName); + } else { + buf.append("'"); + } + buf.append(INITIATED_STATE); + buf.append("', '"); + buf.append(thriftCompactionType2DbType(rqst.getType())); + buf.append("',"); + buf.append(getEpochFn(dbProduct)); + if (rqst.getProperties() != null) { + buf.append(", ?"); + params.add(new StringableMap(rqst.getProperties()).toString()); + } + if (rqst.getRunas() != null) { + buf.append(", ?"); + params.add(rqst.getRunas()); + } + if (rqst.getInitiatorId() != null) { + buf.append(", ?"); + params.add(rqst.getInitiatorId()); + } + if (rqst.getInitiatorVersion() != null) { + buf.append(", ?"); + params.add(rqst.getInitiatorVersion()); + } + buf.append(")"); + String s = buf.toString(); - pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); - LOG.debug("Going to execute query <" + sb + ">"); - ResultSet rs = pst.executeQuery(); - if(rs.next()) { - long enqueuedId = rs.getLong(1); - String state = compactorStateToResponse(rs.getString(2).charAt(0)); - LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() + - "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) + - " with id=" + enqueuedId); - CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false); - resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) + - " and id=" + enqueuedId); - return resp; - } - close(rs); - closeStmt(pst); - params.clear(); - StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " + - "\"CQ_TABLE\", "); - String partName = rqst.getPartitionname(); - if (partName != null) buf.append("\"CQ_PARTITION\", "); - buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\""); - if (rqst.getProperties() != null) { - buf.append(", \"CQ_TBLPROPERTIES\""); - } - if (rqst.getRunas() != null) { - buf.append(", \"CQ_RUN_AS\""); - } - if (rqst.getInitiatorId() != null) { - buf.append(", \"CQ_INITIATOR_ID\""); - } - if (rqst.getInitiatorVersion() != null) { - buf.append(", \"CQ_INITIATOR_VERSION\""); - } - buf.append(") values ("); - buf.append(id); - buf.append(", ?"); - buf.append(", ?"); - buf.append(", "); - params.add(rqst.getDbname()); - params.add(rqst.getTablename()); - if (partName != null) { - buf.append("?, '"); - params.add(partName); - } else { - buf.append("'"); - } - buf.append(INITIATED_STATE); - buf.append("', '"); - buf.append(thriftCompactionType2DbType(rqst.getType())); - buf.append("',"); - buf.append(getEpochFn(dbProduct)); - if (rqst.getProperties() != null) { - buf.append(", ?"); - params.add(new StringableMap(rqst.getProperties()).toString()); - } - if (rqst.getRunas() != null) { - buf.append(", ?"); - params.add(rqst.getRunas()); - } - if (rqst.getInitiatorId() != null) { - buf.append(", ?"); - params.add(rqst.getInitiatorId()); - } - if (rqst.getInitiatorVersion() != null) { - buf.append(", ?"); - params.add(rqst.getInitiatorVersion()); + try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) { + LOG.debug("Going to execute update <" + s + ">"); + pst.executeUpdate(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + return new CompactionResponse(id, INITIATED_RESPONSE, true); + } catch (SQLException e) { + dbConn.rollback(); + throw e; + } } - buf.append(")"); - String s = buf.toString(); - pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); - LOG.debug("Going to execute update <" + s + ">"); - pst.executeUpdate(); - LOG.debug("Going to commit"); - dbConn.commit(); - return new CompactionResponse(id, INITIATED_RESPONSE, true); } catch (SQLException e) { LOG.debug("Going to rollback: ", e); Review Comment: please move this logging to the line where rollback is called Issue Time Tracking ------------------- Worklog Id: (was: 777956) Time Spent: 2h 20m (was: 2h 10m) > Addendum to HIVE-26107: perpared statement is not working on Postgres > --------------------------------------------------------------------- > > Key: HIVE-26267 > URL: https://issues.apache.org/jira/browse/HIVE-26267 > Project: Hive > Issue Type: Bug > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: pull-request-available > Time Spent: 2h 20m > Remaining Estimate: 0h > > The assembled prepared statement in > {code:java} > org.apache.hadoop.hive.metastore.txn.TxnHandler#compact{code} > does not work for Postgres DB. -- This message was sent by Atlassian Jira (v8.20.7#820007)