This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83f8c367b4 [Improve][Zeta] Make sure the pending job schedule always 
works (#9532)
83f8c367b4 is described below

commit 83f8c367b4087964fabe188b6df9cac0712d8fb6
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jul 4 09:32:34 2025 +0800

    [Improve][Zeta] Make sure the pending job schedule always works (#9532)
---
 .../apache/seatunnel/engine/server/CoordinatorService.java   | 12 ++++++++----
 .../seatunnel/engine/server/utils/PeekBlockingQueue.java     | 12 ------------
 2 files changed, 8 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 88af17e1ca..13ff37547b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -230,10 +230,14 @@ public class CoordinatorService {
                     while (true) {
                         try {
                             pendingJobSchedule();
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
-                        } finally {
-                            pendingJob.release();
+                        } catch (Throwable e) {
+                            logger.severe("Error in pending job schedule 
thread", e);
+                            try {
+                                Thread.sleep(3000L);
+                            } catch (InterruptedException ex) {
+                                logger.severe("Pending job schedule thread 
interrupted", ex);
+                                Thread.currentThread().interrupt();
+                            }
                         }
                     }
                 };
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
index d89e8a4565..6e9b206913 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
@@ -61,18 +61,6 @@ public class PeekBlockingQueue<E> {
         return queue.take();
     }
 
-    public void release() {
-        lock.lock();
-        try {
-            if (queue.isEmpty()) {
-                return;
-            }
-            notEmpty.signalAll();
-        } finally {
-            lock.unlock();
-        }
-    }
-
     public E peekBlocking() throws InterruptedException {
         lock.lock();
         try {

Reply via email to