This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 954abd1f2ee Add retry on error 502 and 504 (#42994) (#43044)
954abd1f2ee is described below

commit 954abd1f2eee4e9ad3caf33348270aa6f92f04c3
Author: Jens Scheffler <95105677+jsche...@users.noreply.github.com>
AuthorDate: Tue Oct 15 21:17:04 2024 +0200

    Add retry on error 502 and 504 (#42994) (#43044)
    
    * Add retry on error 502 and 504
    
    * fix mypy findings
    
    * Add pytest
    
    * Convert response code to HTTPStatus
    
    * Add docs to retriable exception
    
    * extend docs for AirflowHttpException
    
    * Fix syntax and typos
    
    * fix pytest
    
    * fix static checks
    
    * fix some static checks
    
    * Fix ruff
    
    * fix pre-commit
    
    ---------
    
    Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.major...@hu.bosch.com>
    (cherry picked from commit 477d74718adb9b3d68621910af061891b9bdcc17)
    
    Co-authored-by: majorosdonat <mjd...@bosch.com>
---
 airflow/api_internal/internal_api_call.py    | 33 +++++++++++++++++++++++++---
 tests/api_internal/test_internal_api_call.py | 23 +++++++++++++++++++
 2 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/airflow/api_internal/internal_api_call.py 
b/airflow/api_internal/internal_api_call.py
index 8838377877b..064834d7c86 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -21,6 +21,7 @@ import inspect
 import json
 import logging
 from functools import wraps
+from http import HTTPStatus
 from typing import Callable, TypeVar
 from urllib.parse import urlparse
 
@@ -40,6 +41,14 @@ RT = TypeVar("RT")
 logger = logging.getLogger(__name__)
 
 
+class AirflowHttpException(AirflowException):
+    """Raise when there is a problem during an http request on the internal 
API decorator."""
+
+    def __init__(self, message: str, status_code: HTTPStatus):
+        super().__init__(message)
+        self.status_code = status_code
+
+
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
 
@@ -105,10 +114,27 @@ def internal_api_call(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
     """
     from requests.exceptions import ConnectionError
 
+    def _is_retryable_exception(exception: BaseException) -> bool:
+        """
+        Evaluate which exception types to retry.
+
+        This is especially demanded for cases where an application gateway or 
Kubernetes ingress can
+        not find a running instance of a webserver hosting the API (HTTP 
502+504) or when the
+        HTTP request fails in general on network level.
+
+        Note that we want to fail on other general errors on the webserver not 
to send bad requests in an endless loop.
+        """
+        retryable_status_codes = (HTTPStatus.BAD_GATEWAY, 
HTTPStatus.GATEWAY_TIMEOUT)
+        return (
+            isinstance(exception, AirflowHttpException)
+            and exception.status_code in retryable_status_codes
+            or isinstance(exception, (ConnectionError, NewConnectionError))
+        )
+
     @tenacity.retry(
         stop=tenacity.stop_after_attempt(10),
         wait=tenacity.wait_exponential(min=1),
-        retry=tenacity.retry_if_exception_type((NewConnectionError, 
ConnectionError)),
+        retry=tenacity.retry_if_exception(_is_retryable_exception),
         before_sleep=tenacity.before_log(logger, logging.WARNING),
     )
     def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
@@ -126,9 +152,10 @@ def internal_api_call(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
         internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
         response = requests.post(url=internal_api_endpoint, 
data=json.dumps(data), headers=headers)
         if response.status_code != 200:
-            raise AirflowException(
+            raise AirflowHttpException(
                 f"Got {response.status_code}:{response.reason} when sending "
-                f"the internal api request: {response.text}"
+                f"the internal api request: {response.text}",
+                HTTPStatus(response.status_code),
             )
         return response.content
 
diff --git a/tests/api_internal/test_internal_api_call.py 
b/tests/api_internal/test_internal_api_call.py
index d779b504ea4..c619e1a695f 100644
--- a/tests/api_internal/test_internal_api_call.py
+++ b/tests/api_internal/test_internal_api_call.py
@@ -25,6 +25,7 @@ from unittest import mock
 
 import pytest
 import requests
+from tenacity import RetryError
 
 from airflow.__main__ import configure_internal_api
 from airflow.api_internal.internal_api_call import InternalApiConfig, 
internal_api_call
@@ -266,6 +267,28 @@ class TestInternalApiCall:
         assert call_kwargs["headers"]["Content-Type"] == "application/json"
         assert "Authorization" in call_kwargs["headers"]
 
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "true",
+            ("core", "internal_api_url"): "http://localhost:8888";,
+            ("database", "sql_alchemy_conn"): "none://",
+        }
+    )
+    @mock.patch("airflow.api_internal.internal_api_call.requests")
+    @mock.patch("tenacity.time.sleep")
+    def test_retry_on_bad_gateway(self, mock_sleep, mock_requests):
+        configure_internal_api(Namespace(subcommand="dag-processor"), conf)
+        response = requests.Response()
+        response.status_code = 502
+        response.reason = "Bad Gateway"
+        response._content = b"Bad Gateway"
+
+        mock_sleep = lambda *_, **__: None  # noqa: F841
+        mock_requests.post.return_value = response
+        with pytest.raises(RetryError):
+            TestInternalApiCall.fake_method_with_params("fake-dag", 
task_id=123, session="session")
+        assert mock_requests.post.call_count == 10
+
     @conf_vars(
         {
             ("core", "database_access_isolation"): "true",

Reply via email to