morningman commented on a change in pull request #2222: Publish version 
immediately after txt commited
URL: https://github.com/apache/incubator-doris/pull/2222#discussion_r349110246
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 ##########
 @@ -578,78 +686,23 @@ private void clearBackendTransactions(TransactionState 
transactionState) {
         }
     }
 
-    /*
-     * get all txns which is ready to publish
-     * a ready-to-publish txn's partition's visible version should be ONE less 
than txn's commit version.
-     */
-    public List<TransactionState> getReadyToPublishTransactions() throws 
UserException {
-        List<TransactionState> readyPublishTransactionState = new 
ArrayList<>();
-        List<TransactionState> allCommittedTransactionState = null;
+
+    public List<TransactionState> getReadyToPublishTransactions() {
+        List<TransactionState> allCommittedTransactionState;
         writeLock();
         try {
-            // only send task to committed transaction
-            allCommittedTransactionState = 
idToTransactionState.values().stream()
+            allCommittedTransactionState = 
idToTransactionState.values().parallelStream()
                     .filter(transactionState -> 
(transactionState.getTransactionStatus() == TransactionStatus.COMMITTED))
                     .collect(Collectors.toList());
-            for (TransactionState transactionState : 
allCommittedTransactionState) {
-                long dbId = transactionState.getDbId();
-                Database db = catalog.getDb(dbId);
-                if (null == db) {
-                    
transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-                    unprotectUpsertTransactionState(transactionState);
-                    continue;
-                }
-            }
         } finally {
             writeUnlock();
         }
-        
-        for (TransactionState transactionState : allCommittedTransactionState) 
{
-            boolean meetPublishPredicate = true;
-            long dbId = transactionState.getDbId();
-            Database db = catalog.getDb(dbId);
-            if (null == db) {
-                continue;
-            }
-            db.readLock();
-            try {
-                readLock();
-                try {
-                    for (TableCommitInfo tableCommitInfo : 
transactionState.getIdToTableCommitInfos().values()) {
-                        OlapTable table = (OlapTable) 
db.getTable(tableCommitInfo.getTableId());
-                        if (null == table) {
-                            LOG.warn("table {} is dropped after commit, ignore 
this table",
-                                     tableCommitInfo.getTableId());
-                            continue;
-                        }
-                        for (PartitionCommitInfo partitionCommitInfo : 
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                            Partition partition = 
table.getPartition(partitionCommitInfo.getPartitionId());
-                            if (null == partition) {
-                                LOG.warn("partition {} is dropped after 
commit, ignore this partition",
-                                         partitionCommitInfo.getPartitionId());
-                                continue;
-                            }
-                            if (partitionCommitInfo.getVersion() != 
partition.getVisibleVersion() + 1) {
-                                meetPublishPredicate = false;
-                                break;
-                            }
-                        }
-                        if (!meetPublishPredicate) {
-                            break;
-                        }
-                    }
-                    if (meetPublishPredicate) {
-                        LOG.debug("transaction [{}] is ready to publish", 
transactionState);
-                        readyPublishTransactionState.add(transactionState);
-                    }
-                } finally {
-                    readUnlock();
-                }
-            } finally {
-                db.readUnlock();
-            }
-        }
-        return readyPublishTransactionState;
+
+        LOG.debug("there are {} transactions is COMMITTED", 
allCommittedTransactionState.size());
 
 Review comment:
   ```suggestion
           LOG.debug("there are {} transactions in COMMITTED status", 
allCommittedTransactionState.size());
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to