This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4fd46e2d132 [Fix](delete) Fix delete job timeout when executing `delete from ...` (#37363) 4fd46e2d132 is described below commit 4fd46e2d132e3a4cacd722f81eba5d733f545615 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Sat Jul 6 16:55:57 2024 +0800 [Fix](delete) Fix delete job timeout when executing `delete from ...` (#37363) ## Proposed changes Currently, when FE execute delete job, it will send `REALTIME_PUSH` task to all affected replicas and **wait for all asynchronous tasks sent to the backend to return successful status results** or until timeout(which is at least 30s for delete job). If some replica failed to do the job and report an error for the task to FE, FE will retry the task to that replica. However, for some errors like `DELETE_INVALID_CONDITION`/`DELETE_INVALID_PARAMETERS`, we should fail and abort the delete job on FE directly and report the errors to users, rather than keep retrying in vain. So this PR let the delete job fail and abort directly on FE and report the errors to users if FE receives an above error from BE. --- be/src/olap/delete_handler.cpp | 5 ++ .../main/java/org/apache/doris/load/DeleteJob.java | 6 +++ .../java/org/apache/doris/master/MasterImpl.java | 28 +++++++--- .../main/java/org/apache/doris/task/PushTask.java | 12 +++++ .../test_delete_from_timeout.groovy | 59 ++++++++++++++++++++++ 5 files changed, 104 insertions(+), 6 deletions(-) diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 73a2e3b1967..d40e7faafc2 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -35,6 +35,7 @@ #include "olap/predicate_creator.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "util/debug_points.h" using apache::thrift::ThriftDebugString; using std::vector; @@ -90,6 +91,10 @@ std::string trans_op(const std::string& opt) { Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema, const std::vector<TCondition>& conditions, DeletePredicatePB* del_pred) { + DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", { + return Status::Error<false>(dp->param<int>("error_code"), + dp->param<std::string>("error_msg")); + }) if (conditions.empty()) { return Status::Error<DELETE_INVALID_PARAMETERS>( "invalid parameters for store_cond. condition_size={}", conditions.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index f3915bc6f4c..4ccbfa44d8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -379,6 +379,12 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ long timeoutMs = getTimeoutMs(); boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (ok) { + if (!countDownLatch.getStatus().ok()) { + // encounter some errors that don't need to retry, abort directly + LOG.warn("delete job failed, errmsg={}", countDownLatch.getStatus().getErrorMsg()); + throw new UserException(String.format("delete job failed, errmsg:%s", + countDownLatch.getStatus().getErrorMsg())); + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 8be450404a0..47e6f1cb303 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -140,7 +140,8 @@ public class MasterImpl { && taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE - && taskType != TTaskType.CALCULATE_DELETE_BITMAP) { + && taskType != TTaskType.CALCULATE_DELETE_BITMAP + && taskType != TTaskType.REALTIME_PUSH) { return result; } } @@ -153,7 +154,6 @@ public class MasterImpl { finishCreateReplica(task, request); break; case REALTIME_PUSH: - checkHasTabletInfo(request); Preconditions.checkState(request.isSetReportVersion()); finishRealtimePush(task, request); break; @@ -301,16 +301,32 @@ public class MasterImpl { } } - private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) { - List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos(); - Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty()); - + private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) throws Exception { PushTask pushTask = (PushTask) task; long dbId = pushTask.getDbId(); long backendId = pushTask.getBackendId(); long signature = task.getSignature(); long transactionId = ((PushTask) task).getTransactionId(); + + if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) { + if (pushTask.getPushType() == TPushType.DELETE) { + // DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS, + // we don't need to retry if meet them. + // note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe + if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) { + pushTask.countDownToZero(request.getTaskStatus().getStatusCode(), + task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString()); + AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); + LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString()); + } + } + return; + } + + checkHasTabletInfo(request); + List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index 5cf428e7228..0dadef4dee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.Predicate; import org.apache.doris.analysis.SlotRef; import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TCondition; @@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushReq; import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Maps; @@ -218,6 +220,16 @@ public class PushTask extends AgentTask { } } + // call this always means one of tasks is failed. count down to zero to finish entire task + public void countDownToZero(TStatusCode code, String errMsg) { + if (this.latch != null) { + latch.countDownToZero(new Status(code, errMsg)); + if (LOG.isDebugEnabled()) { + LOG.debug("PushTask count down to zero. error msg: {}", errMsg); + } + } + } + public long getReplicaId() { return replicaId; } diff --git a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy new file mode 100644 index 00000000000..2d5bf41b3db --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_delete_from_timeout","nonConcurrent") { + + def tableName = "test_delete_from_timeout" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `col1` BOOLEAN NOT NULL, + `col2` DECIMAL(17, 1) NOT NULL, + `col3` INT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`col1`, `col2`, `col3`) + DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + sql "insert into ${tableName} values(1, 99.9, 234);" + GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure", + [error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: "data type is float or double."]) + test { + sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + exception "data type is float or double." + } + + GetDebugPoint().clearDebugPointsForAllBEs() + + GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure", + [error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: "invalid parameters for store_cond. condition_size=1"]) + test { + sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + exception "invalid parameters for store_cond. condition_size=1" + } + } catch (Exception e) { + logger.info(e.getMessage()) + AssertTrue(false) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure") + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org