hawk9821 commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2302800359
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -248,25 +241,25 @@ private void startPendingJobScheduleThread() {
}
private void pendingJobSchedule() throws InterruptedException {
- JobMaster jobMaster = pendingJob.peekBlocking();
- if (Objects.isNull(jobMaster)) {
- // This situation almost never happens because pendingJobSchedule
is single-threaded
- logger.warning("The peek job master is null");
+ PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
+ if (Objects.isNull(pendingJobInfo)) {
+ logger.warning("The peek job info is null");
Thread.sleep(3000);
return;
}
- Long jobId = jobMaster.getJobId();
+ Long jobId = pendingJobInfo.getJobId();
+ JobMaster jobMaster = pendingJobInfo.getJobMaster();
- if (!pendingJobMasterMap.containsKey(jobId)) {
+ if (!pendingJobQueue.containsJobId(jobId)) {
logger.fine(String.format("Job ID : %s already cancelled", jobId));
- queueRemove(jobMaster);
Review Comment:
Modify queueRemove method:
```
private void queueRemove(JobMaster jobMaster) throws InterruptedException {
PendingJobInfo take = pendingJobQueue.peekBlocking();
if (take.getJobMaster() != jobMaster) {
logger.severe("The job master is not equal to the peek job
master");
} else {
pendingJobQueue.take();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]