Hi Kaxil,
very cool proposal! Added just a few comments and would LOVE to see this
in 3.3! This really opens the door (securely!) to extend the retry logic
to much more use cases!
Jens
On 18.04.26 09:02, Kaxil Naik wrote:
Example in the email got cut but check the docs or demo video in 2nd PR,
you can either pass custom instructions or it uses the default instructions:
llm_policy = LLMRetryPolicy(
llm_conn_id="pydanticai_default",
instructions="..."
timeout=30.0, # max seconds to wait for LLM response
fallback_rules=[ # used when LLM call fails
RetryRule(exception=ConnectionError, action=RetryAction.RETRY,
retry_delay=timedelta(seconds=10)),
RetryRule(exception=PermissionError, action=RetryAction.FAIL),
],
)
@task(retries=5, retry_policy=llm_policy)
def call_external_api():
...
https://github.com/apache/airflow/commit/effb3ef00d29a476010d502d15dcebc1cd11cdb6
https://github.com/apache/airflow/blob/effb3ef00d29a476010d502d15dcebc1cd11cdb6/providers/common/ai/src/airflow/providers/common/ai/policies/retry.py#L49-L62
On Sat, 18 Apr 2026 at 05:09, Dev-iL <[email protected]> wrote:
Sounds very useful!
Regarding the llm-powered case: where do the system prompt or custom user
instructions go? The only thing we specified is the connection id, yet the
connection doesn't have a system prompt field (at least according to
https://airflow.apache.org/docs/apache-airflow-providers-common-ai/stable/connections/pydantic_ai.html
).
So how do we configure the agent to classify into nonstandard categories
or behave according to our specifications when certain types of errors are
encountered?
Best,
Dev-iL
On Sat, 18 Apr 2026, 3:02 Kaxil Naik, <[email protected]> wrote:
Hi all,
Continuing the push to make Airflow AI-native, I have put together
AIP-105:
Pluggable Retry Policies.
Wiki:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies
PR (core): https://github.com/apache/airflow/pull/65450
PR (LLM-powered, common-ai provider):
https://github.com/apache/airflow/pull/65451
The problem is straightforward: Airflow retries every failure the same
way.
An expired API key gets retried 3 times over 15 minutes. A rate-limited
API
gets retried immediately, hitting the same 429. Users who want smarter
retries today have to wrap every task in try/except and raise
AirflowFailException manually, mixing retry logic into business logic.
This AIP adds a retry_policy parameter to BaseOperator. The policy
evaluates the actual exception at failure time and returns RETRY (with a
custom delay), FAIL (skip remaining retries), or DEFAULT (standard
behaviour). It runs in the worker process, not the scheduler.
Declarative example:
```python
@task(
retries=5,
retry_policy=ExceptionRetryPolicy(
rules=[
RetryRule(
exception="requests.exceptions.HTTPError",
action=RetryAction.RETRY,
retry_delay=timedelta(minutes=5)
),
RetryRule(
exception="google.auth.exceptions.RefreshError",
action=RetryAction.FAIL
),
]
),
)
def call_api():
...
```
LLM-powered example -- uses any pydantic-ai provider (OpenAI, Anthropic,
Bedrock, Ollama):
@task(retries=5, retry_policy=(llm_conn_id="my_llm"))
def call_flaky_api(): ...
The LLM version classifies errors into categories (auth, rate_limit,
network, data, transient, permanent) using structured output with a
30-second timeout and declarative fallback rules for when the LLM itself
is
down.
I have attached demo videos and screenshots to both PRs showing both
policies running end-to-end in Airflow -- including the LLM correctly
classifying 4 different error types via Claude Haiku.
Full design, done criteria, and implementation details are in the wiki
page
above.
Feedback welcome.
Thanks,
Kaxil
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]