This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 83f8c367b4 [Improve][Zeta] Make sure the pending job schedule always
works (#9532)
83f8c367b4 is described below
commit 83f8c367b4087964fabe188b6df9cac0712d8fb6
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jul 4 09:32:34 2025 +0800
[Improve][Zeta] Make sure the pending job schedule always works (#9532)
---
.../apache/seatunnel/engine/server/CoordinatorService.java | 12 ++++++++----
.../seatunnel/engine/server/utils/PeekBlockingQueue.java | 12 ------------
2 files changed, 8 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 88af17e1ca..13ff37547b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -230,10 +230,14 @@ public class CoordinatorService {
while (true) {
try {
pendingJobSchedule();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- pendingJob.release();
+ } catch (Throwable e) {
+ logger.severe("Error in pending job schedule
thread", e);
+ try {
+ Thread.sleep(3000L);
+ } catch (InterruptedException ex) {
+ logger.severe("Pending job schedule thread
interrupted", ex);
+ Thread.currentThread().interrupt();
+ }
}
}
};
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
index d89e8a4565..6e9b206913 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java
@@ -61,18 +61,6 @@ public class PeekBlockingQueue<E> {
return queue.take();
}
- public void release() {
- lock.lock();
- try {
- if (queue.isEmpty()) {
- return;
- }
- notEmpty.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
public E peekBlocking() throws InterruptedException {
lock.lock();
try {