This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a7fe2ae9010 [branch-2.1][Fix](Job)Replace BlockingWaitStrategy with 
LiteTimeoutBlockingWaitStrategy to avoid deadlock issues. (#40625) (#40707)
a7fe2ae9010 is described below

commit a7fe2ae9010fe882dcad3e456c6717d2b666b440
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Thu Sep 12 22:42:45 2024 +0800

    [branch-2.1][Fix](Job)Replace BlockingWaitStrategy with 
LiteTimeoutBlockingWaitStrategy to avoid deadlock issues. (#40625) (#40707)
    
    …
    
    FYI https://issues.apache.org/jira/browse/LOG4J2-1221
    
    - BlockingWaitStrategy is a wait strategy used in the Disruptor
    framework that blocks the thread when the ring buffer is full or not yet
    available for publishing.
    
    When threads are blocked, they are waiting for space in the ring buffer
    to become available, which can lead to potential deadlocks if not
    managed properly.
    Timeout Handling:
    
    - LiteTimeoutBlockingWaitStrategy provides a timeout for waiting
    threads. If the buffer is not ready within the timeout period, the
    thread is released, preventing it from being blocked indefinitely.
    Reduced Risk of Deadlocks:
    
    - By avoiding indefinite blocking, this strategy reduces the risk of
    deadlocks caused by threads waiting on each other. The timeout allows
    the system to handle scenarios where resources are temporarily
    
    (cherry picked from commit 087048f2b2f06eae15e80e7ffa209610c9e3f173)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../apache/doris/job/disruptor/TaskDisruptor.java  | 15 ++++++++++++---
 .../doris/job/executor/DispatchTaskHandler.java    |  5 ++++-
 .../doris/job/executor/TimerJobSchedulerTask.java  |  5 ++++-
 .../job/manager/TaskDisruptorGroupManager.java     | 22 +++++++++++-----------
 .../apache/doris/job/scheduler/JobScheduler.java   | 13 ++++++++-----
 .../doris/scheduler/disruptor/TaskDisruptor.java   | 10 +++++-----
 6 files changed, 44 insertions(+), 26 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 45564e99b17..6ca2924c593 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -24,6 +24,8 @@ import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.concurrent.ThreadFactory;
 
@@ -33,6 +35,7 @@ import java.util.concurrent.ThreadFactory;
  * @param <T> the type of the event handled by the Disruptor
  */
 public class TaskDisruptor<T> {
+    private static final Logger LOG = 
LogManager.getLogger(TaskDisruptor.class);
     private final Disruptor<T> disruptor;
     private final EventTranslatorVararg<T> eventTranslator;
 
@@ -68,9 +71,15 @@ public class TaskDisruptor<T> {
      *
      * @param args the arguments for the event
      */
-    public void publishEvent(Object... args) {
-        RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
-        ringBuffer.publishEvent(eventTranslator, args);
+    public boolean publishEvent(Object... args) {
+        try {
+            RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
+            return ringBuffer.tryPublishEvent(eventTranslator, args);
+        } catch (Exception e) {
+            LOG.warn("Failed to publish event", e);
+            // Handle the exception, e.g., retry or alert
+        }
+        return false;
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index e5933d133cb..d93393aa0ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -66,7 +66,10 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 }
                 JobType jobType = event.getJob().getJobType();
                 for (AbstractTask task : tasks) {
-                    disruptorMap.get(jobType).publishEvent(task, 
event.getJob().getJobConfig());
+                    if (!disruptorMap.get(jobType).publishEvent(task, 
event.getJob().getJobConfig())) {
+                        task.cancel();
+                        continue;
+                    }
                     log.info("dispatch timer job success, job id is {},  task 
id is {}",
                             event.getJob().getJobId(), task.getTaskId());
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 74efe49beb1..25bbccf3fa2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -44,7 +44,10 @@ public class TimerJobSchedulerTask<T extends AbstractJob> 
implements TimerTask {
                 log.info("job status is not running, job id is {}, skip 
dispatch", this.job.getJobId());
                 return;
             }
-            dispatchDisruptor.publishEvent(this.job);
+            if (!dispatchDisruptor.publishEvent(this.job)) {
+                log.warn("dispatch timer job failed, job id is {}, job name is 
{}",
+                        this.job.getJobId(), this.job.getJobName());
+            }
         } catch (Exception e) {
             log.warn("dispatch timer job error, task id is {}", 
this.job.getJobId(), e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index 4e31e467013..b1ccb976443 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -31,15 +31,16 @@ import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
 import org.apache.doris.job.task.AbstractTask;
 
-import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventTranslatorVararg;
+import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
 import com.lmax.disruptor.WorkHandler;
 import lombok.Getter;
 
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 public class TaskDisruptorGroupManager<T extends AbstractTask> {
 
@@ -86,7 +87,8 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
                 (event, sequence, args) -> event.setJob((AbstractJob) args[0]);
         this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, 
DISPATCH_TIMER_JOB_QUEUE_SIZE,
                 dispatchThreadFactory,
-                new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, 
eventTranslator);
+                new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
+                dispatchTaskExecutorHandlers, eventTranslator);
     }
 
     private void registerInsertDisruptor() {
@@ -102,7 +104,8 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
                     event.setJobConfig((JobExecutionConfiguration) args[1]);
                 };
         TaskDisruptor insertDisruptor = new 
TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE,
-                insertTaskThreadFactory, new BlockingWaitStrategy(), 
insertTaskExecutorHandlers, eventTranslator);
+                insertTaskThreadFactory, new 
LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
+                insertTaskExecutorHandlers, eventTranslator);
         disruptorMap.put(JobType.INSERT, insertDisruptor);
     }
 
@@ -119,17 +122,14 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
                     event.setJobConfig((JobExecutionConfiguration) args[1]);
                 };
         TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, 
DISPATCH_MTMV_TASK_QUEUE_SIZE,
-                mtmvTaskThreadFactory, new BlockingWaitStrategy(), 
insertTaskExecutorHandlers, eventTranslator);
+                mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, 
TimeUnit.MILLISECONDS),
+                insertTaskExecutorHandlers, eventTranslator);
         disruptorMap.put(JobType.MV, mtmvDisruptor);
     }
 
-    public void dispatchTimerJob(AbstractJob job) {
-        dispatchDisruptor.publishEvent(job);
-    }
-
-    public void dispatchInstantTask(AbstractTask task, JobType jobType,
-                                    JobExecutionConfiguration 
jobExecutionConfiguration) {
-        disruptorMap.get(jobType).publishEvent(task, 
jobExecutionConfiguration);
+    public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
+                                       JobExecutionConfiguration 
jobExecutionConfiguration) {
+        return disruptorMap.get(jobType).publishEvent(task, 
jobExecutionConfiguration);
     }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 33d12c30a4b..862b85597cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -155,7 +155,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     }
 
 
-    public void schedulerInstantJob(T job, TaskType taskType, C context) {
+    public void schedulerInstantJob(T job, TaskType taskType, C context) 
throws JobException {
         List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, 
context);
         if (CollectionUtils.isEmpty(tasks)) {
             log.info("job create task is empty, skip scheduler, job id is {}, 
job name is {}", job.getJobId(),
@@ -165,12 +165,15 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
             }
             return;
         }
-        tasks.forEach(task -> {
-            taskDisruptorGroupManager.dispatchInstantTask(task, 
job.getJobType(),
-                    job.getJobConfig());
+        for (AbstractTask task : tasks) {
+            if (!taskDisruptorGroupManager.dispatchInstantTask(task, 
job.getJobType(),
+                    job.getJobConfig())) {
+                throw new JobException("dispatch instant task failed, job id 
is "
+                        + job.getJobId() + ", task id is " + task.getTaskId());
+            }
             log.info("dispatch instant job, job id is {}, job name is {}, task 
id is {}", job.getJobId(),
                     job.getJobName(), task.getTaskId());
-        });
+        }
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 57df84a0e89..345b31d6bc2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -21,8 +21,8 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.scheduler.constants.TaskType;
 
-import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventTranslatorThreeArg;
+import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
 import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
 @Log4j2
 public class TaskDisruptor implements Closeable {
 
-    private  Disruptor<TaskEvent> disruptor;
+    private Disruptor<TaskEvent> disruptor;
     private static final int DEFAULT_RING_BUFFER_SIZE = 
Config.async_task_queen_size;
 
     private static final int consumerThreadCount = 
Config.async_task_consumer_thread_num;
@@ -74,7 +74,7 @@ public class TaskDisruptor implements Closeable {
     public void start() {
         CustomThreadFactory exportTaskThreadFactory = new 
CustomThreadFactory("export-task-consumer");
         disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
-                ProducerType.SINGLE, new BlockingWaitStrategy());
+                ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, 
TimeUnit.MILLISECONDS));
         WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
         for (int i = 0; i < consumerThreadCount; i++) {
             workers[i] = new TaskHandler();
@@ -109,7 +109,7 @@ public class TaskDisruptor implements Closeable {
         try {
             disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
         } catch (Exception e) {
-            log.error("tryPublish failed, jobId: {}", jobId, e);
+            log.warn("tryPublish failed, jobId: {}", jobId, e);
         }
     }
 
@@ -127,7 +127,7 @@ public class TaskDisruptor implements Closeable {
         try {
             disruptor.publishEvent(TRANSLATOR, taskId, 0L, 
TaskType.TRANSIENT_TASK);
         } catch (Exception e) {
-            log.error("tryPublish failed, taskId: {}", taskId, e);
+            log.warn("tryPublish failed, taskId: {}", taskId, e);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to