dstandish commented on code in PR #42932:
URL: https://github.com/apache/airflow/pull/42932#discussion_r1801511669


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1953,73 +1953,80 @@ def check_trigger_timeouts(
                 if num_timed_out_tasks:
                     self.log.info("Timed out %i deferred tasks without fired 
triggers", num_timed_out_tasks)
 
-    # [START find_zombies]
-    def _find_zombies(self) -> None:
+    # [START find_and_purge_zombies]
+    def _find_and_purge_zombies(self) -> None:
         """
-        Find zombie task instances and create a TaskCallbackRequest to be 
handled by the DAG processor.
+        Find and purge zombie task instances.
 
-        Zombie instances are tasks haven't heartbeated for too long or have a 
no-longer-running LocalTaskJob.
+        Zombie instances are tasks that failed to heartbeat for too long, or
+        have a no-longer-running LocalTaskJob.
+
+        A TaskCallbackRequest is also created for the killed zombie to be
+        handled by the DAG processor, and the executor is informed to no longer
+        count the zombie as running when it calculates parallelism.
         """
+        with create_session() as session:
+            if zombies := self._find_zombies(session=session):
+                self._purge_zombies(zombies, session=session)
+
+    def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
         from airflow.jobs.job import Job
 
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
-
-        with create_session() as session:
-            zombies: list[tuple[TI, str, str]] = (
-                session.execute(
-                    select(TI, DM.fileloc, DM.processor_subdir)
-                    .with_hint(TI, "USE INDEX (ti_state)", 
dialect_name="mysql")
-                    .join(Job, TI.job_id == Job.id)
-                    .join(DM, TI.dag_id == DM.dag_id)
-                    .where(TI.state == TaskInstanceState.RUNNING)
-                    .where(
-                        or_(
-                            Job.state != JobState.RUNNING,
-                            Job.latest_heartbeat < limit_dttm,
-                        )
-                    )
-                    .where(Job.job_type == "LocalTaskJob")
-                    .where(TI.queued_by_job_id == self.job.id)
-                )
-                .unique()
-                .all()
+        zombies = (
+            session.execute(
+                select(TI, DM.fileloc, DM.processor_subdir)
+                .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+                .join(Job, TI.job_id == Job.id)
+                .join(DM, TI.dag_id == DM.dag_id)
+                .where(TI.state == TaskInstanceState.RUNNING)
+                .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat 
< limit_dttm))
+                .where(Job.job_type == "LocalTaskJob")
+                .where(TI.queued_by_job_id == self.job.id)
             )
-
+            .unique()

Review Comment:
   like, the data will be 1 row per TI and i do not see possibility to 
introduce duplicate tis



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to