kaxil commented on code in PR #42837:
URL: https://github.com/apache/airflow/pull/42837#discussion_r1795668324


##########
tests/jobs/test_scheduler_job.py:
##########
@@ -5091,6 +5091,94 @@ def _running_counts():
         assert session.scalar(select(func.count()).select_from(DagRun)) == 46
         assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 36
 
+    @pytest.mark.parametrize(
+        "pause_it, expected_running",
+        [
+            (True, 0),
+            (False, 3),
+        ],
+    )
+    def test_backfill_runs_not_started_when_backfill_paused(
+        self, pause_it, expected_running, dag_maker, session
+    ):
+        """
+        When backfill is paused, will not start.
+        """
+        dag1_dag_id = "test_dag1"
+        with dag_maker(
+            dag_id=dag1_dag_id,
+            start_date=DEFAULT_DATE,
+            schedule=timedelta(days=1),
+            max_active_runs=1,
+        ):
+            EmptyOperator(task_id="mytask")
+
+        def _running_counts():
+            dag1_non_b_running = (
+                session.query(func.count(DagRun.id))
+                .filter(
+                    DagRun.dag_id == dag1_dag_id,
+                    DagRun.state == State.RUNNING,
+                    DagRun.run_type != DagRunType.BACKFILL_JOB,
+                )
+                .scalar()
+            )
+            dag1_b_running = (
+                session.query(func.count(DagRun.id))
+                .filter(
+                    DagRun.dag_id == dag1_dag_id,
+                    DagRun.state == State.RUNNING,
+                    DagRun.run_type == DagRunType.BACKFILL_JOB,
+                )
+                .scalar()
+            )
+            total_running_count = (
+                session.query(func.count(DagRun.id)).filter(DagRun.state == 
State.RUNNING).scalar()
+            )
+            return dag1_non_b_running, dag1_b_running, total_running_count
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
+        scheduler_job.executor = MockExecutor(do_update=False)
+        self.job_runner.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
+
+        from_date = pendulum.parse("2021-01-01")
+        to_date = pendulum.parse("2021-01-06")
+        b = _create_backfill(
+            dag_id=dag1_dag_id,
+            from_date=from_date,
+            to_date=to_date,
+            max_active_runs=3,
+            reverse=False,
+            dag_run_conf={},
+        )
+        dag1_non_b_running, dag1_b_running, total_running = _running_counts()
+
+        # initial state -- nothing is running
+        assert dag1_non_b_running == 0
+        assert dag1_b_running == 0
+        assert total_running == 0
+        assert session.query(func.count(DagRun.id)).scalar() == 6
+        assert session.scalar(select(func.count()).where(DagRun.dag_id == 
dag1_dag_id)) == 6
+
+        if pause_it:
+            b = session.get(Backfill, b.id)
+            b.is_paused = True
+
+        session.commit()
+
+        # now let's run scheduler it once

Review Comment:
   ```suggestion
           # now let's run scheduler once
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to