Yes, indeed this helps a lot overall with the retry mechanics. Thanks, kaxil.
On Sat, Apr 18, 2026 at 7:32 PM Jarek Potiuk <[email protected]> wrote: > Though.. of course need to take a deep look - but that sounds like "big" > "small" improvement :) > > On Sat, Apr 18, 2026 at 8:29 PM Jarek Potiuk <[email protected]> wrote: > > > Very nice and straightforward. > > > > J. > > > > > > On Sat, Apr 18, 2026 at 6:15 PM Jens Scheffler <[email protected]> > > wrote: > > > >> Hi Kaxil, > >> > >> very cool proposal! Added just a few comments and would LOVE to see this > >> in 3.3! This really opens the door (securely!) to extend the retry logic > >> to much more use cases! > >> > >> Jens > >> > >> On 18.04.26 09:02, Kaxil Naik wrote: > >> > Example in the email got cut but check the docs or demo video in 2nd > PR, > >> > you can either pass custom instructions or it uses the default > >> instructions: > >> > > >> > llm_policy = LLMRetryPolicy( > >> > llm_conn_id="pydanticai_default", > >> > instructions="..." > >> > timeout=30.0, # max seconds to wait for LLM response > >> > fallback_rules=[ # used when LLM call fails > >> > RetryRule(exception=ConnectionError, > >> action=RetryAction.RETRY, > >> > retry_delay=timedelta(seconds=10)), > >> > RetryRule(exception=PermissionError, > >> action=RetryAction.FAIL), > >> > ], > >> > ) > >> > > >> > @task(retries=5, retry_policy=llm_policy) > >> > def call_external_api(): > >> > ... > >> > > >> > > >> > > >> > https://github.com/apache/airflow/commit/effb3ef00d29a476010d502d15dcebc1cd11cdb6 > >> > > >> > https://github.com/apache/airflow/blob/effb3ef00d29a476010d502d15dcebc1cd11cdb6/providers/common/ai/src/airflow/providers/common/ai/policies/retry.py#L49-L62 > >> > > >> > > >> > On Sat, 18 Apr 2026 at 05:09, Dev-iL <[email protected]> wrote: > >> > > >> >> Sounds very useful! > >> >> > >> >> Regarding the llm-powered case: where do the system prompt or custom > >> user > >> >> instructions go? The only thing we specified is the connection id, > yet > >> the > >> >> connection doesn't have a system prompt field (at least according to > >> >> > >> >> > >> > https://airflow.apache.org/docs/apache-airflow-providers-common-ai/stable/connections/pydantic_ai.html > >> >> ). > >> >> So how do we configure the agent to classify into nonstandard > >> categories > >> >> or behave according to our specifications when certain types of > >> errors are > >> >> encountered? > >> >> > >> >> Best, > >> >> Dev-iL > >> >> > >> >> On Sat, 18 Apr 2026, 3:02 Kaxil Naik, <[email protected]> wrote: > >> >> > >> >>> Hi all, > >> >>> > >> >>> Continuing the push to make Airflow AI-native, I have put together > >> >> AIP-105: > >> >>> Pluggable Retry Policies. > >> >>> > >> >>> Wiki: > >> >>> > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-105%3A+Pluggable+Retry+Policies > >> >>> PR (core): https://github.com/apache/airflow/pull/65450 > >> >>> PR (LLM-powered, common-ai provider): > >> >>> https://github.com/apache/airflow/pull/65451 > >> >>> > >> >>> The problem is straightforward: Airflow retries every failure the > same > >> >> way. > >> >>> An expired API key gets retried 3 times over 15 minutes. A > >> rate-limited > >> >> API > >> >>> gets retried immediately, hitting the same 429. Users who want > smarter > >> >>> retries today have to wrap every task in try/except and raise > >> >>> AirflowFailException manually, mixing retry logic into business > logic. > >> >>> > >> >>> This AIP adds a retry_policy parameter to BaseOperator. The policy > >> >>> evaluates the actual exception at failure time and returns RETRY > >> (with a > >> >>> custom delay), FAIL (skip remaining retries), or DEFAULT (standard > >> >>> behaviour). It runs in the worker process, not the scheduler. > >> >>> > >> >>> Declarative example: > >> >>> > >> >>> ```python > >> >>> @task( > >> >>> retries=5, > >> >>> retry_policy=ExceptionRetryPolicy( > >> >>> rules=[ > >> >>> RetryRule( > >> >>> exception="requests.exceptions.HTTPError", > >> >>> action=RetryAction.RETRY, > >> >>> retry_delay=timedelta(minutes=5) > >> >>> ), > >> >>> RetryRule( > >> >>> exception="google.auth.exceptions.RefreshError", > >> >>> action=RetryAction.FAIL > >> >>> ), > >> >>> ] > >> >>> ), > >> >>> ) > >> >>> def call_api(): > >> >>> ... > >> >>> ``` > >> >>> > >> >>> LLM-powered example -- uses any pydantic-ai provider (OpenAI, > >> Anthropic, > >> >>> Bedrock, Ollama): > >> >>> > >> >>> @task(retries=5, retry_policy=(llm_conn_id="my_llm")) > >> >>> def call_flaky_api(): ... > >> >>> > >> >>> The LLM version classifies errors into categories (auth, rate_limit, > >> >>> network, data, transient, permanent) using structured output with a > >> >>> 30-second timeout and declarative fallback rules for when the LLM > >> itself > >> >> is > >> >>> down. > >> >>> > >> >>> I have attached demo videos and screenshots to both PRs showing both > >> >>> policies running end-to-end in Airflow -- including the LLM > correctly > >> >>> classifying 4 different error types via Claude Haiku. > >> >>> > >> >>> Full design, done criteria, and implementation details are in the > wiki > >> >> page > >> >>> above. > >> >>> > >> >>> Feedback welcome. > >> >>> > >> >>> Thanks, > >> >>> Kaxil > >> >>> > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: [email protected] > >> For additional commands, e-mail: [email protected] > >> > >> >
