This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e286ee6 Add unittest for #17305 (#18806)
e286ee6 is described below
commit e286ee64c5c0aadd79a5cd86f881fb1acfbf317e
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Oct 8 04:24:52 2021 +0100
Add unittest for #17305 (#18806)
---
airflow/jobs/backfill_job.py | 2 ++
tests/jobs/test_backfill_job.py | 10 ++++++++++
2 files changed, 12 insertions(+)
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 64333d9..1bcb151 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -233,6 +233,8 @@ class BackfillJob(BaseJob):
# special case: if the task needs to be rescheduled put it back
elif ti.state == State.UP_FOR_RESCHEDULE:
self.log.warning("Task instance %s is up for reschedule", ti)
+ # During handling of reschedule state in
ti._handle_reschedule, try number is reduced
+ # by one, so we should not use reduced_key to avoid key error
ti_status.running.pop(ti.key)
ti_status.to_run[ti.key] = ti
# special case: The state of the task can be set to NONE by the
task itself
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 32ae794..fd76c8f 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1298,7 +1298,11 @@ class TestBackfillJob:
ti_status.to_run.clear()
# test for reschedule
+ # For rescheduled state, tests that reduced_key is not
+ # used by upping try_number.
+ ti._try_number = 2
ti.set_state(State.UP_FOR_RESCHEDULE, session)
+ assert ti.try_number == 3 # see ti.try_number property in
taskinstance module
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
assert len(ti_status.running) == 0
@@ -1311,6 +1315,12 @@ class TestBackfillJob:
# test for none
ti.set_state(State.NONE, session)
+ # Setting ti._try_number = 0 brings us to ti.try_number==1
+ # so that the reduced_key access will work fine
+ ti._try_number = 0
+ assert ti.try_number == 1 # see ti.try_number property in
taskinstance module
+ session.merge(ti)
+ session.commit()
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
assert len(ti_status.running) == 0