morningman commented on a change in pull request #2222: Publish version immediately after txt commited URL: https://github.com/apache/incubator-doris/pull/2222#discussion_r349109254
########## File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java ########## @@ -578,78 +686,23 @@ private void clearBackendTransactions(TransactionState transactionState) { } } - /* - * get all txns which is ready to publish - * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version. - */ - public List<TransactionState> getReadyToPublishTransactions() throws UserException { - List<TransactionState> readyPublishTransactionState = new ArrayList<>(); - List<TransactionState> allCommittedTransactionState = null; + + public List<TransactionState> getReadyToPublishTransactions() { + List<TransactionState> allCommittedTransactionState; writeLock(); try { - // only send task to committed transaction - allCommittedTransactionState = idToTransactionState.values().stream() + allCommittedTransactionState = idToTransactionState.values().parallelStream() .filter(transactionState -> (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED)) .collect(Collectors.toList()); - for (TransactionState transactionState : allCommittedTransactionState) { - long dbId = transactionState.getDbId(); - Database db = catalog.getDb(dbId); - if (null == db) { - transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState); - continue; - } - } } finally { writeUnlock(); } - - for (TransactionState transactionState : allCommittedTransactionState) { - boolean meetPublishPredicate = true; - long dbId = transactionState.getDbId(); - Database db = catalog.getDb(dbId); - if (null == db) { - continue; - } - db.readLock(); - try { - readLock(); - try { - for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { - OlapTable table = (OlapTable) db.getTable(tableCommitInfo.getTableId()); - if (null == table) { - LOG.warn("table {} is dropped after commit, ignore this table", - tableCommitInfo.getTableId()); - continue; - } - for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) { - Partition partition = table.getPartition(partitionCommitInfo.getPartitionId()); - if (null == partition) { - LOG.warn("partition {} is dropped after commit, ignore this partition", - partitionCommitInfo.getPartitionId()); - continue; - } - if (partitionCommitInfo.getVersion() != partition.getVisibleVersion() + 1) { - meetPublishPredicate = false; - break; - } - } - if (!meetPublishPredicate) { - break; - } - } - if (meetPublishPredicate) { - LOG.debug("transaction [{}] is ready to publish", transactionState); - readyPublishTransactionState.add(transactionState); - } - } finally { - readUnlock(); - } - } finally { - db.readUnlock(); - } - } - return readyPublishTransactionState; + + LOG.debug("there are {} transactions is COMMITTED", allCommittedTransactionState.size()); + + return allCommittedTransactionState.parallelStream() Review comment: parallelStream() has a bad performance when num of elements in collections is small. Don't you think `stream()` is enough? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org