ReadytoRocc opened a new issue #19120:
URL: https://github.com/apache/airflow/issues/19120
### Apache Airflow version
2.2.0 (latest released)
### Operating System
Container-Optimized OS with Containerd (cos_containerd) - GKE
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### What happened
I went to clear a successful task instance that is using a Deferred
operator, and noticed it immediately went into a state of successful. The
Operator is designed to run a trigger that will wait 1 minute from
approximately task start. Upon investigation, I did not see the following in
the second run's task log: `{taskinstance.py:1332} INFO - Pausing task as
DEFERRED.`.
### What you expected to happen
I would expect the task to once again go into a state of deferred for
approx. 1 minute and then succeed.
### How to reproduce
Run the following DAG. Once a task instance is successful, clear it, and see
the task fail. This confirms `ti.next_method` was not cleared, as `execute` did
not rerun and reset `execute_try_number`. `execute` did not rerun, as
`ti.next_method` was not cleared.
```
from datetime import datetime
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.triggers.testing import SuccessTrigger
class RetryOperator(BaseOperator):
def execute(self, context):
ti = context["ti"]
next_method = ti.next_method
try_number = ti.try_number
self.log.info(
f"In `execute`: try_number: {try_number}, next_method
{next_method}."
)
self.defer(
trigger=SuccessTrigger(),
method_name="next",
kwargs={"execute_try_number": try_number},
)
def next(self, context, execute_try_number, event=None):
ti = context["ti"]
next_method = ti.next_method
try_number = ti.try_number
self.log.info(
f"In `next`: try_number: {try_number}, next_method
{next_method}, execute_try_number: {execute_try_number}."
)
if execute_try_number != try_number:
raise AirflowException("`execute` wasn't run during clear!")
return None # Success!
with DAG(
"triggerer_clear", schedule_interval=None, start_date=datetime(2021, 10,
20)
) as dag:
RetryOperator(task_id="clear")
```
### Anything else
I believe this is due to `ti.next_method` (and `ti.next_method_kwargs`) not
being cleared after a task has completed. A similar issue was raised in
https://github.com/apache/airflow/issues/18146.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]