This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 2e1e2b308fd27c809708de269a28495546bc9049
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Mon Mar 28 10:22:17 2022 +0800

    [fix](mini-load) Remove mini load in LOADING and PENDING state (#8649)
    
    1. Remove some unused code.
    2. handle mini load with wrong state
        1. For some historical reasons, some mini load jobs in LOADING state 
have not been cleared.
            As a result, new load jobs cannot be committed.
        2. If a mini load job is created right before FE restart, the mini load 
job will be in PENDING state forever.
            But it should be removed finally.
---
 .../org/apache/doris/load/loadv2/LoadManager.java  | 146 +++------------------
 1 file changed, 21 insertions(+), 125 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index ae0f97c..64e0973 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -36,7 +36,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.FailMsg.CancelType;
@@ -45,9 +44,7 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TMiniLoadBeginRequest;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TUniqueId;
-import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -359,32 +356,6 @@ public class LoadManager implements Writable {
         }
     }
 
-    public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
-        Database db = 
Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
-
-        LoadJob loadJob = null;
-        readLock();
-        try {
-            Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(db.getId());
-            if (labelToLoadJobs == null) {
-                throw new DdlException("Load job does not exist");
-            }
-            List<LoadJob> loadJobList = labelToLoadJobs.get(stmt.getLabel());
-            if (loadJobList == null) {
-                throw new DdlException("Load job does not exist");
-            }
-            Optional<LoadJob> loadJobOptional = 
loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst();
-            if (!loadJobOptional.isPresent()) {
-                throw new DdlException("There is no uncompleted job which 
label is " + stmt.getLabel());
-            }
-            loadJob = loadJobOptional.get();
-        } finally {
-            readUnlock();
-        }
-
-        loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user 
cancel"));
-    }
-
     public void replayEndLoadJob(LoadJobFinalOperation operation) {
         LoadJob job = idToLoadJob.get(operation.getId());
         if (job == null) {
@@ -683,102 +654,6 @@ public class LoadManager implements Writable {
         }
     }
 
-    @Deprecated
-    // Deprecated in version 0.12
-    // This method is only for bug fix. And should be call after image and 
edit log are replayed.
-    public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
-        for (LoadJob job : idToLoadJob.values()) {
-            /*
-             * Bug 1:
-             * in previous implementation, there is a bug that when the job's 
corresponding transaction is
-             * COMMITTED but not VISIBLE, the load job's state is LOADING, so 
that the job may be CANCELLED
-             * by timeout checker, which is not right.
-             * So here we will check each LOADING load jobs' txn status, if it 
is COMMITTED, change load job's
-             * state to COMMITTED.
-             * this method should be removed at next upgrading.
-             * only mini load job will be in LOADING state when persist, 
because mini load job is executed before writing
-             * edit log.
-             */
-            if (job.getState() == JobState.LOADING) {
-                // unfortunately, transaction id in load job is also not 
persisted, so we have to traverse
-                // all transactions to find it.
-                TransactionState txn = 
txnMgr.getTransactionStateByCallbackIdAndStatus(job.getDbId(), 
job.getCallbackId(),
-                        Sets.newHashSet(TransactionStatus.COMMITTED));
-                if (txn != null) {
-                    job.updateState(JobState.COMMITTED);
-                    LOG.info("transfer load job {} state from LOADING to 
COMMITTED, because txn {} is COMMITTED."
-                            + " label: {}, db: {}", job.getId(), 
txn.getTransactionId(), job.getLabel(), job.getDbId());
-                    continue;
-                }
-            }
-
-            /*
-             * Bug 2:
-             * There is bug in Doris version 0.10.15. When a load job in 
PENDING or LOADING
-             * state was replayed from image (not through the edit log), we 
forgot to add
-             * the corresponding callback id in the CallbackFactory. As a 
result, the
-             * subsequent finish txn edit logs cannot properly finish the job 
during the
-             * replay process. This results in that when the FE restarts, 
these load jobs
-             * that should have been completed are re-entered into the pending 
state,
-             * resulting in repeated submission load tasks.
-             *
-             * Those wrong images are unrecoverable, so that we have to cancel 
all load jobs
-             * in PENDING or LOADING state when restarting FE, to avoid submit 
jobs
-             * repeatedly.
-             *
-             * This code can be remove when upgrading from 0.11.x to future 
version.
-             */
-            if (job.getState() == JobState.LOADING || job.getState() == 
JobState.PENDING) {
-                JobState prevState = job.getState();
-                TransactionState txn = 
txnMgr.getTransactionStateByCallbackId(job.getDbId(), job.getCallbackId());
-                if (txn != null) {
-                    // the txn has already been committed or visible, change 
job's state to committed or finished
-                    if (txn.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
-                        job.updateState(JobState.COMMITTED);
-                        LOG.info("transfer load job {} state from {} to 
COMMITTED, because txn {} is COMMITTED",
-                                job.getId(), prevState, 
txn.getTransactionId());
-                    } else if (txn.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                        job.updateState(JobState.FINISHED);
-                        LOG.info("transfer load job {} state from {} to 
FINISHED, because txn {} is VISIBLE",
-                                job.getId(), prevState, 
txn.getTransactionId());
-                    } else if (txn.getTransactionStatus() == 
TransactionStatus.ABORTED) {
-                        job.cancelJobWithoutCheck(new 
FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
-                        LOG.info("transfer load job {} state from {} to 
CANCELLED, because txn {} is ABORTED",
-                                job.getId(), prevState, 
txn.getTransactionId());
-                    } else {
-                        // pending txn, do nothing
-                    }
-                    continue;
-                }
-
-                if (job.getJobType() == EtlJobType.MINI) {
-                    // for mini load job, just set it as CANCELLED, because 
mini load is a synchronous load.
-                    // it would be failed if FE restart.
-                    job.cancelJobWithoutCheck(new 
FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
-                    LOG.info("transfer mini load job {} state from {} to 
CANCELLED, because transaction status is unknown"
-                                    + ". label: {}, db: {}",
-                            job.getId(), prevState, job.getLabel(), 
job.getDbId());
-                } else {
-                    // txn is not found. here are 2 cases:
-                    // 1. txn is not start yet, so we can just set job to 
CANCELLED, and user need to submit the job again.
-                    // 2. because of the bug, txn is ABORTED of VISIBLE, and 
job is not finished. and txn is expired and
-                    //    be removed from transaction manager. So we don't 
know this job is finished or cancelled.
-                    //    in this case, the job should has been submitted long 
ago (otherwise the txn could not have been 
-                    //    removed by expiration). 
-                    // Without affecting the first case of job, we set the job 
finish time to be the same as the create time. 
-                    // In this way, the second type of job will be 
automatically cleared after running removeOldLoadJob();
-
-                    // use CancelType.UNKNOWN, so that we can set finish time 
to be the same as the create time
-                    job.cancelJobWithoutCheck(new 
FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false);
-                    LOG.info("finish load job {} from {} to CANCELLED, because 
transaction status is unknown. label: {}, db: {}, create: {}",
-                            job.getId(), prevState, job.getLabel(), 
job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp()));
-                }
-            }
-        }
-
-        removeOldLoadJob();
-    }
-
     @Override
     public void write(DataOutput out) throws IOException {
         long currentTimeMs = System.currentTimeMillis();
@@ -798,6 +673,27 @@ public class LoadManager implements Writable {
             if (loadJob.isExpired(currentTimeMs)) {
                 continue;
             }
+
+            if (loadJob.getJobType() == EtlJobType.MINI) {
+                // This is a bug fix. the mini load job should not with state 
LOADING.
+                if (loadJob.getState() == JobState.LOADING) {
+                    LOG.warn("skip mini load job {} in db {} with LOADING 
state", loadJob.getId(), loadJob.getDbId());
+                    continue;
+                }
+
+                if (loadJob.getState() == JobState.PENDING) {
+                    // bad case. When a mini load job is created and then FE 
restart.
+                    // the job will be in PENDING state forever.
+                    // This is a temp solution to remove these jobs. And the 
mini load job should be deprecated in Doris v1.1
+                    TransactionState state = 
Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
+                            loadJob.getDbId(), loadJob.getTransactionId());
+                    if (state == null) {
+                        LOG.warn("skip mini load job {} in db {} with PENDING 
state and with txn: {}",
+                                loadJob.getId(), loadJob.getDbId(), 
loadJob.getTransactionId());
+                        continue;
+                    }
+                }
+            }
             idToLoadJob.put(loadJob.getId(), loadJob);
             Map<String, List<LoadJob>> map = 
dbIdToLabelToLoadJobs.get(loadJob.getDbId());
             if (map == null) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to