Thanks all and thanks for the thoughtful questions Elad, and +1 to
Przemyslaw's point
on testing.
1. Multiple exceptions per rule -- agreed, this is useful. I've added it
to the implementation and updated the AIP. RetryRule now accepts a list:
RetryRule(
exception=[ConnectionError, TimeoutError,
"requests.exceptions.HTTPError"],
action=RetryAction.RETRY,
retry_delay=timedelta(seconds=30),
)
PR updated: https://github.com/apache/airflow/pull/65474
2. Xcom-based retry decisions -- interesting idea. The context parameter
passed to policy.evaluate() already gives access to xcom via
context["ti"].xcom_pull(), so a custom RetryPolicy subclass can use
previous attempt data today. Making this a first-class pattern is worth
exploring as a follow-up.
3. Priority weight integration -- makes sense as a separate extension.
Probably belongs in AIP-100 or a follow-up.
4. Testing policies (Przemyslaw's point) -- since policy.evaluate() is a
regular Python function, you can test it directly:
policy = ExceptionRetryPolicy(rules=[...])
assert policy.evaluate(ValueError("x"), 1, 3).action == RetryAction.FAIL
Works in a REPL, unit test, or CI. No special tooling needed. I'll add a
short note about this in the docs.
Thanks,
Kaxil
On Mon, 20 Apr 2026 at 21:50, Przemysław Mirowski <[email protected]> wrote:
> Great idea! Thanks for proposing it. It will make proper exception-retry
> handling much easier than it was before and will open a new door for more
> extensibility too.
>
> +1 also to the questions/concerts which Elad mentioned. Not sure though
> regarding the changes to Priority Weight (maybe part of AIP-100) and point
> 2 connected to not having full control over exception raised, looking at
> the Airflow ecosystem, all of the providers with different libraries, I
> think it is something which we should consider.
>
> One additional comment - as the Retry Policies will only run on workers
> (which is pretty nice from e.g. security point of view), I didn't see in
> AIP and PR a way to validate if configured Retry Policy will work before
> actually the time when it will be needed. That can make setting the Retry
> Policies harder and the testing them will be cumbersome. I think that
> having a nice way (from Dag Authors perspective) of testing the defined
> Retry Policy if it will actually work when it really be needed, would make
> Dag Authors lifes much easier and defining these rules much easier
> (something in some way connected to that could be testing the Airflow
> Connections and work for moving the "Test Connection" to workers). Of
> course, Retry Policies like LLM-related are rather out-of-scope, but
> testing more deterministic behaviours should be much easier to do.
>
> ________________________________
> From: Vincent Beck <[email protected]>
> Sent: 20 April 2026 15:17
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] AIP-105: Pluggable Retry Policies
>
> Makes a lot of sense to me!
>
> On 2026/04/19 13:56:56 Elad Kalif wrote:
> > Great idea!
> > Love it!
> >
> > I have some questions / comments:
> > 1. The current interface suggests rules that contain a RetryRule object.
> > but I wonder if we should change exception to exceptions and accepting a
> > list.
> >
> > rules=[
> > RetryRule(
> > exceptions=["requests.exceptions.HTTPError",
> > "google.auth.exceptions.RefreshError"]
> > ...,
> > )]
> >
> > I'm thinking about a case where several exceptions need the same
> behaviour
> > and user may not wish to offer different reasoning for each.
> >
> > 2. Does it make sense to extend the interface for xcom values? I'm
> thinking
> > about a case where dag authors don't have full control over the exception
> > raised or even some upstream library changing the exception which results
> > in retry logic to be broken. Maybe we should offer also the option to set
> > retry based on previous attempt xcom value?
> >
> > 3. Maybe something for the longer run but still worth discussing - one of
> > the main motivations for custom weight rules
> >
> https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html#custom-weight-rule
> > was to set priority based on try number. I wonder if we may want to
> somehow
> > combine it with the Retry rule. For retries, I can argue that the weight
> of
> > the task is a property of retry instructions and it can very be that the
> > weight will change depending on the exception.
> >
> > On Sun, Apr 19, 2026 at 6:30 AM Shahar Epstein <[email protected]>
> wrote:
> >
> > > Great idea! I liked both the deterministic approach as well as the AI
> > > integrated.
> > >
> > >
> > > Shahar
> > >
> > > On Sat, Apr 18, 2026 at 3:02 AM Kaxil Naik <[email protected]>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Continuing the push to make Airflow AI-native, I have put together
> > > AIP-105:
> > > > Pluggable Retry Policies.
> > > >
> > > > Wiki:
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies
> > > > PR (core): https://github.com/apache/airflow/pull/65450
> > > > PR (LLM-powered, common-ai provider):
> > > > https://github.com/apache/airflow/pull/65451
> > > >
> > > > The problem is straightforward: Airflow retries every failure the
> same
> > > way.
> > > > An expired API key gets retried 3 times over 15 minutes. A
> rate-limited
> > > API
> > > > gets retried immediately, hitting the same 429. Users who want
> smarter
> > > > retries today have to wrap every task in try/except and raise
> > > > AirflowFailException manually, mixing retry logic into business
> logic.
> > > >
> > > > This AIP adds a retry_policy parameter to BaseOperator. The policy
> > > > evaluates the actual exception at failure time and returns RETRY
> (with a
> > > > custom delay), FAIL (skip remaining retries), or DEFAULT (standard
> > > > behaviour). It runs in the worker process, not the scheduler.
> > > >
> > > > Declarative example:
> > > >
> > > > ```python
> > > > @task(
> > > > retries=5,
> > > > retry_policy=ExceptionRetryPolicy(
> > > > rules=[
> > > > RetryRule(
> > > > exception="requests.exceptions.HTTPError",
> > > > action=RetryAction.RETRY,
> > > > retry_delay=timedelta(minutes=5)
> > > > ),
> > > > RetryRule(
> > > > exception="google.auth.exceptions.RefreshError",
> > > > action=RetryAction.FAIL
> > > > ),
> > > > ]
> > > > ),
> > > > )
> > > > def call_api():
> > > > ...
> > > > ```
> > > >
> > > > LLM-powered example -- uses any pydantic-ai provider (OpenAI,
> Anthropic,
> > > > Bedrock, Ollama):
> > > >
> > > > @task(retries=5, retry_policy=(llm_conn_id="my_llm"))
> > > > def call_flaky_api(): ...
> > > >
> > > > The LLM version classifies errors into categories (auth, rate_limit,
> > > > network, data, transient, permanent) using structured output with a
> > > > 30-second timeout and declarative fallback rules for when the LLM
> itself
> > > is
> > > > down.
> > > >
> > > > I have attached demo videos and screenshots to both PRs showing both
> > > > policies running end-to-end in Airflow -- including the LLM correctly
> > > > classifying 4 different error types via Claude Haiku.
> > > >
> > > > Full design, done criteria, and implementation details are in the
> wiki
> > > page
> > > > above.
> > > >
> > > > Feedback welcome.
> > > >
> > > > Thanks,
> > > > Kaxil
> > > >
> > >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>