Example in the email got cut but check the docs or demo video in 2nd PR,
you can either pass custom instructions or it uses the default instructions:
llm_policy = LLMRetryPolicy(
llm_conn_id="pydanticai_default",
instructions="..."
timeout=30.0, # max seconds to wait for LLM response
fallback_rules=[ # used when LLM call fails
RetryRule(exception=ConnectionError, action=RetryAction.RETRY,
retry_delay=timedelta(seconds=10)),
RetryRule(exception=PermissionError, action=RetryAction.FAIL),
],
)
@task(retries=5, retry_policy=llm_policy)
def call_external_api():
...
https://github.com/apache/airflow/commit/effb3ef00d29a476010d502d15dcebc1cd11cdb6
https://github.com/apache/airflow/blob/effb3ef00d29a476010d502d15dcebc1cd11cdb6/providers/common/ai/src/airflow/providers/common/ai/policies/retry.py#L49-L62
On Sat, 18 Apr 2026 at 05:09, Dev-iL <[email protected]> wrote:
> Sounds very useful!
>
> Regarding the llm-powered case: where do the system prompt or custom user
> instructions go? The only thing we specified is the connection id, yet the
> connection doesn't have a system prompt field (at least according to
>
> https://airflow.apache.org/docs/apache-airflow-providers-common-ai/stable/connections/pydantic_ai.html
> ).
> So how do we configure the agent to classify into nonstandard categories
> or behave according to our specifications when certain types of errors are
> encountered?
>
> Best,
> Dev-iL
>
> On Sat, 18 Apr 2026, 3:02 Kaxil Naik, <[email protected]> wrote:
>
> > Hi all,
> >
> > Continuing the push to make Airflow AI-native, I have put together
> AIP-105:
> > Pluggable Retry Policies.
> >
> > Wiki:
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies
> > PR (core): https://github.com/apache/airflow/pull/65450
> > PR (LLM-powered, common-ai provider):
> > https://github.com/apache/airflow/pull/65451
> >
> > The problem is straightforward: Airflow retries every failure the same
> way.
> > An expired API key gets retried 3 times over 15 minutes. A rate-limited
> API
> > gets retried immediately, hitting the same 429. Users who want smarter
> > retries today have to wrap every task in try/except and raise
> > AirflowFailException manually, mixing retry logic into business logic.
> >
> > This AIP adds a retry_policy parameter to BaseOperator. The policy
> > evaluates the actual exception at failure time and returns RETRY (with a
> > custom delay), FAIL (skip remaining retries), or DEFAULT (standard
> > behaviour). It runs in the worker process, not the scheduler.
> >
> > Declarative example:
> >
> > ```python
> > @task(
> > retries=5,
> > retry_policy=ExceptionRetryPolicy(
> > rules=[
> > RetryRule(
> > exception="requests.exceptions.HTTPError",
> > action=RetryAction.RETRY,
> > retry_delay=timedelta(minutes=5)
> > ),
> > RetryRule(
> > exception="google.auth.exceptions.RefreshError",
> > action=RetryAction.FAIL
> > ),
> > ]
> > ),
> > )
> > def call_api():
> > ...
> > ```
> >
> > LLM-powered example -- uses any pydantic-ai provider (OpenAI, Anthropic,
> > Bedrock, Ollama):
> >
> > @task(retries=5, retry_policy=(llm_conn_id="my_llm"))
> > def call_flaky_api(): ...
> >
> > The LLM version classifies errors into categories (auth, rate_limit,
> > network, data, transient, permanent) using structured output with a
> > 30-second timeout and declarative fallback rules for when the LLM itself
> is
> > down.
> >
> > I have attached demo videos and screenshots to both PRs showing both
> > policies running end-to-end in Airflow -- including the LLM correctly
> > classifying 4 different error types via Claude Haiku.
> >
> > Full design, done criteria, and implementation details are in the wiki
> page
> > above.
> >
> > Feedback welcome.
> >
> > Thanks,
> > Kaxil
> >
>