This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e20146d44b3 Drop unneeded unique() call on SQL (#43064)
e20146d44b3 is described below

commit e20146d44b340f719f7fb432f93741e011690558
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Wed Oct 16 12:08:50 2024 +0800

    Drop unneeded unique() call on SQL (#43064)
---
 airflow/jobs/scheduler_job_runner.py | 24 ++++++++++--------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 951602e14e6..7958adb64cf 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1974,20 +1974,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
-        zombies = (
-            session.execute(
-                select(TI, DM.fileloc, DM.processor_subdir)
-                .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
-                .join(Job, TI.job_id == Job.id)
-                .join(DM, TI.dag_id == DM.dag_id)
-                .where(TI.state == TaskInstanceState.RUNNING)
-                .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat 
< limit_dttm))
-                .where(Job.job_type == "LocalTaskJob")
-                .where(TI.queued_by_job_id == self.job.id)
-            )
-            .unique()
-            .all()
-        )
+        zombies = session.execute(
+            select(TI, DM.fileloc, DM.processor_subdir)
+            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+            .join(Job, TI.job_id == Job.id)
+            .join(DM, TI.dag_id == DM.dag_id)
+            .where(TI.state == TaskInstanceState.RUNNING)
+            .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < 
limit_dttm))
+            .where(Job.job_type == "LocalTaskJob")
+            .where(TI.queued_by_job_id == self.job.id)
+        ).all()
         if zombies:
             self.log.warning("Failing (%s) jobs without heartbeat after %s", 
len(zombies), limit_dttm)
         return zombies

Reply via email to