This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 2e1e2b308fd27c809708de269a28495546bc9049 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Mon Mar 28 10:22:17 2022 +0800 [fix](mini-load) Remove mini load in LOADING and PENDING state (#8649) 1. Remove some unused code. 2. handle mini load with wrong state 1. For some historical reasons, some mini load jobs in LOADING state have not been cleared. As a result, new load jobs cannot be committed. 2. If a mini load job is created right before FE restart, the mini load job will be in PENDING state forever. But it should be removed finally. --- .../org/apache/doris/load/loadv2/LoadManager.java | 146 +++------------------ 1 file changed, 21 insertions(+), 125 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index ae0f97c..64e0973 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -36,7 +36,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; @@ -45,9 +44,7 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -359,32 +356,6 @@ public class LoadManager implements Writable { } } - public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException { - Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName()); - - LoadJob loadJob = null; - readLock(); - try { - Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId()); - if (labelToLoadJobs == null) { - throw new DdlException("Load job does not exist"); - } - List<LoadJob> loadJobList = labelToLoadJobs.get(stmt.getLabel()); - if (loadJobList == null) { - throw new DdlException("Load job does not exist"); - } - Optional<LoadJob> loadJobOptional = loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst(); - if (!loadJobOptional.isPresent()) { - throw new DdlException("There is no uncompleted job which label is " + stmt.getLabel()); - } - loadJob = loadJobOptional.get(); - } finally { - readUnlock(); - } - - loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel")); - } - public void replayEndLoadJob(LoadJobFinalOperation operation) { LoadJob job = idToLoadJob.get(operation.getId()); if (job == null) { @@ -683,102 +654,6 @@ public class LoadManager implements Writable { } } - @Deprecated - // Deprecated in version 0.12 - // This method is only for bug fix. And should be call after image and edit log are replayed. - public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { - for (LoadJob job : idToLoadJob.values()) { - /* - * Bug 1: - * in previous implementation, there is a bug that when the job's corresponding transaction is - * COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED - * by timeout checker, which is not right. - * So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's - * state to COMMITTED. - * this method should be removed at next upgrading. - * only mini load job will be in LOADING state when persist, because mini load job is executed before writing - * edit log. - */ - if (job.getState() == JobState.LOADING) { - // unfortunately, transaction id in load job is also not persisted, so we have to traverse - // all transactions to find it. - TransactionState txn = txnMgr.getTransactionStateByCallbackIdAndStatus(job.getDbId(), job.getCallbackId(), - Sets.newHashSet(TransactionStatus.COMMITTED)); - if (txn != null) { - job.updateState(JobState.COMMITTED); - LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED." - + " label: {}, db: {}", job.getId(), txn.getTransactionId(), job.getLabel(), job.getDbId()); - continue; - } - } - - /* - * Bug 2: - * There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING - * state was replayed from image (not through the edit log), we forgot to add - * the corresponding callback id in the CallbackFactory. As a result, the - * subsequent finish txn edit logs cannot properly finish the job during the - * replay process. This results in that when the FE restarts, these load jobs - * that should have been completed are re-entered into the pending state, - * resulting in repeated submission load tasks. - * - * Those wrong images are unrecoverable, so that we have to cancel all load jobs - * in PENDING or LOADING state when restarting FE, to avoid submit jobs - * repeatedly. - * - * This code can be remove when upgrading from 0.11.x to future version. - */ - if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { - JobState prevState = job.getState(); - TransactionState txn = txnMgr.getTransactionStateByCallbackId(job.getDbId(), job.getCallbackId()); - if (txn != null) { - // the txn has already been committed or visible, change job's state to committed or finished - if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) { - job.updateState(JobState.COMMITTED); - LOG.info("transfer load job {} state from {} to COMMITTED, because txn {} is COMMITTED", - job.getId(), prevState, txn.getTransactionId()); - } else if (txn.getTransactionStatus() == TransactionStatus.VISIBLE) { - job.updateState(JobState.FINISHED); - LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE", - job.getId(), prevState, txn.getTransactionId()); - } else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) { - job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); - LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED", - job.getId(), prevState, txn.getTransactionId()); - } else { - // pending txn, do nothing - } - continue; - } - - if (job.getJobType() == EtlJobType.MINI) { - // for mini load job, just set it as CANCELLED, because mini load is a synchronous load. - // it would be failed if FE restart. - job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); - LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" - + ". label: {}, db: {}", - job.getId(), prevState, job.getLabel(), job.getDbId()); - } else { - // txn is not found. here are 2 cases: - // 1. txn is not start yet, so we can just set job to CANCELLED, and user need to submit the job again. - // 2. because of the bug, txn is ABORTED of VISIBLE, and job is not finished. and txn is expired and - // be removed from transaction manager. So we don't know this job is finished or cancelled. - // in this case, the job should has been submitted long ago (otherwise the txn could not have been - // removed by expiration). - // Without affecting the first case of job, we set the job finish time to be the same as the create time. - // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); - - // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time - job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); - LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", - job.getId(), prevState, job.getLabel(), job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp())); - } - } - } - - removeOldLoadJob(); - } - @Override public void write(DataOutput out) throws IOException { long currentTimeMs = System.currentTimeMillis(); @@ -798,6 +673,27 @@ public class LoadManager implements Writable { if (loadJob.isExpired(currentTimeMs)) { continue; } + + if (loadJob.getJobType() == EtlJobType.MINI) { + // This is a bug fix. the mini load job should not with state LOADING. + if (loadJob.getState() == JobState.LOADING) { + LOG.warn("skip mini load job {} in db {} with LOADING state", loadJob.getId(), loadJob.getDbId()); + continue; + } + + if (loadJob.getState() == JobState.PENDING) { + // bad case. When a mini load job is created and then FE restart. + // the job will be in PENDING state forever. + // This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1 + TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState( + loadJob.getDbId(), loadJob.getTransactionId()); + if (state == null) { + LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", + loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId()); + continue; + } + } + } idToLoadJob.put(loadJob.getId(), loadJob); Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId()); if (map == null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org