This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bdd5e953192 branch-3.0: [Fix](job)Fix CAS competition failure leading 
to message publishing failure. #45018 (#45029)
bdd5e953192 is described below

commit bdd5e9531923cb96e10bc3f453a253e45e22ea63
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Dec 8 00:28:38 2024 -0800

    branch-3.0: [Fix](job)Fix CAS competition failure leading to message 
publishing failure. #45018 (#45029)
    
    Cherry-picked from #45018
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../java/org/apache/doris/job/disruptor/TaskDisruptor.java    | 11 ++++++++++-
 .../java/org/apache/doris/job/scheduler/JobScheduler.java     |  4 +++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 6ca2924c593..2b2e3df0418 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -74,7 +74,16 @@ public class TaskDisruptor<T> {
     public boolean publishEvent(Object... args) {
         try {
             RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
-            return ringBuffer.tryPublishEvent(eventTranslator, args);
+            // Check if the RingBuffer has enough capacity to reserve 10 slots 
for tasks
+            // If there is insufficient capacity (less than 10 slots available)
+            // log a warning and drop the current task
+            if (!ringBuffer.hasAvailableCapacity(10)) {
+                LOG.warn("ring buffer has no available capacity,task will be 
dropped,"
+                        + "please check the task queue size.");
+                return false;
+            }
+            ringBuffer.publishEvent(eventTranslator, args);
+            return true;
         } catch (Exception e) {
             LOG.warn("Failed to publish event", e);
             // Handle the exception, e.g., retry or alert
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index c43376de304..921f333791c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -173,7 +173,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         for (AbstractTask task : tasks) {
             if (!taskDisruptorGroupManager.dispatchInstantTask(task, 
job.getJobType(),
                     job.getJobConfig())) {
-                throw new 
JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
+                String errorMsg = 
job.formatMsgWhenExecuteQueueFull(task.getTaskId());
+                task.onFail(errorMsg);
+                throw new JobException(errorMsg);
 
             }
             log.info("dispatch instant job, job id is {}, job name is {}, task 
id is {}", job.getJobId(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to