kaxil commented on code in PR #62816:
URL: https://github.com/apache/airflow/pull/62816#discussion_r2896204369


##########
providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py:
##########
@@ -157,13 +194,266 @@ def test_connection(self) -> tuple[bool, str]:
         """
         Test connection by resolving the model.
 
-        Validates that the model string is valid, the provider package is
-        installed, and the provider class can be instantiated. Does NOT make an
-        LLM API call — that would be expensive, flaky, and fail for reasons
-        unrelated to connectivity (quotas, billing, rate limits).
+        Validates that the model string is valid and the provider class can be
+        instantiated with the supplied credentials.  Does NOT make an LLM API
+        call — that would be expensive and fail for reasons unrelated to
+        connectivity (quotas, billing, rate limits).
         """
         try:
             self.get_conn()
             return True, "Model resolved successfully."
         except Exception as e:
             return False, str(e)
+
+    @classmethod
+    def for_connection(cls, conn_id: str, model_id: str | None = None) -> 
PydanticAIHook:
+        """
+        Return the correct :class:`PydanticAIHook` subclass for *conn_id*.
+
+        Looks up the connection's ``conn_type`` in the registered hook map and
+        instantiates the matching subclass.  Falls back to
+        :class:`PydanticAIHook` for unknown types.
+
+        :param conn_id: Airflow connection ID.
+        :param model_id: Optional model override forwarded to the hook.
+        """
+        conn = cls.get_connection(conn_id)
+        hook_cls = _CONN_TYPE_TO_HOOK.get(conn.conn_type or "", cls)
+        return hook_cls(llm_conn_id=conn_id, model_id=model_id)
+
+
+class PydanticAIAzureHook(PydanticAIHook):
+    """
+    Hook for Azure OpenAI via pydantic-ai.
+
+    Connection fields:
+        - **password**: Azure API key
+        - **host**: Azure endpoint (e.g. 
``https://<resource>.openai.azure.com``)
+        - **extra** JSON::
+
+            {"model": "azure:gpt-4o", "api_version": "2024-07-01-preview"}
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. ``"azure:gpt-4o"``.
+    """
+
+    conn_type = "pydanticai_azure"
+    hook_name = "Pydantic AI (Azure OpenAI)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login"],
+            "relabeling": {"password": "API Key", "host": "Azure Endpoint"},
+            "placeholders": {
+                "host": "https://<resource>.openai.azure.com",
+                "extra": '{"model": "azure:gpt-4o", "api_version": 
"2024-07-01-preview"}',
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        kwargs: dict[str, Any] = {}
+        if api_key:
+            kwargs["api_key"] = api_key
+        if base_url:
+            kwargs["azure_endpoint"] = base_url
+        if extra.get("api_version"):
+            kwargs["api_version"] = extra["api_version"]
+        return kwargs
+
+
+class PydanticAIBedrockHook(PydanticAIHook):
+    """
+    Hook for AWS Bedrock via pydantic-ai.
+
+    Credentials are resolved in order:
+
+    1. IAM keys from ``extra`` (``aws_access_key_id`` + 
``aws_secret_access_key``,
+       optionally ``aws_session_token``).
+    2. Bearer token in ``extra`` (``api_key``, maps to env 
``AWS_BEARER_TOKEN_BEDROCK``).
+    3. Environment-variable / instance-role chain (``AWS_PROFILE``, IAM role, 
…)
+       when no explicit keys are provided.
+
+    Connection fields:
+        - **extra** JSON::
+
+            {
+              "model": "bedrock:us.anthropic.claude-opus-4-5",
+              "region_name": "us-east-1",
+              "aws_access_key_id": "AKIA...",
+              "aws_secret_access_key": "...",
+              "aws_session_token": "...",
+              "profile_name": "my-aws-profile",
+              "api_key": "bearer-token",
+              "base_url": "https://custom-bedrock-endpoint";,
+              "aws_read_timeout": 60.0,
+              "aws_connect_timeout": 10.0
+            }
+
+          Leave ``aws_access_key_id`` / ``aws_secret_access_key`` and 
``api_key``
+          empty to use the default AWS credential chain.
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. 
``"bedrock:us.anthropic.claude-opus-4-5"``.
+    """
+
+    conn_type = "pydanticai_bedrock"
+    hook_name = "Pydantic AI (AWS Bedrock)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login", "host", "password"],
+            "relabeling": {},
+            "placeholders": {
+                "extra": (
+                    '{"model": "bedrock:us.anthropic.claude-opus-4-5", '
+                    '"region_name": "us-east-1"}'
+                    "  — leave aws_access_key_id empty for IAM role / env-var 
auth"
+                ),
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        _str_keys = (
+            "aws_access_key_id",
+            "aws_secret_access_key",
+            "aws_session_token",
+            "region_name",
+            "profile_name",
+            # Bearer-token auth (alternative to IAM key/secret).
+            # Maps to AWS_BEARER_TOKEN_BEDROCK env var.
+            "api_key",
+            # Custom Bedrock runtime endpoint.
+            "base_url",
+        )
+        kwargs: dict[str, Any] = {k: extra[k] for k in _str_keys if 
extra.get(k) is not None}
+        # BedrockProvider expects float for timeout values; JSON integers must 
be coerced.
+        for _timeout_key in ("aws_read_timeout", "aws_connect_timeout"):
+            if extra.get(_timeout_key) is not None:
+                kwargs[_timeout_key] = float(extra[_timeout_key])
+        return kwargs
+
+
+class PydanticAIVertexHook(PydanticAIHook):
+    """
+    Hook for Google Vertex AI (or Generative Language API) via pydantic-ai.
+
+    Credentials are resolved in order:
+
+    1. ``service_account_file`` (path string) or ``service_account_info`` (JSON
+       object) in ``extra`` — loaded into a 
``google.auth.credentials.Credentials``
+       object and passed as ``credentials`` to ``GoogleProvider``.
+    2. ``api_key`` in ``extra`` — for Generative Language API (non-Vertex) or
+       Vertex API-key auth.
+    3. Application Default Credentials (``GOOGLE_APPLICATION_CREDENTIALS``,
+       ``gcloud auth application-default login``, Workload Identity, …) when
+       no explicit credentials are provided.
+
+    Connection fields:
+        - **extra** JSON::
+
+            {
+                "model": "google-vertex:gemini-2.0-flash",
+                "project": "my-gcp-project",
+                "location": "us-central1",
+                "service_account_file": "/path/to/sa.json",
+                "vertexai": true,
+            }
+
+        Use ``"service_account_info"`` instead of ``"service_account_file"`` to
+        embed the service-account JSON directly (as an object, not a string 
path).
+        Setting both at the same time raises ``ValueError``.
+
+        Set ``"vertexai": true`` to force Vertex AI mode when only ``api_key`` 
is
+        provided.  Omit ``vertexai`` for the Generative Language API (GLA).
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. 
``"google-vertex:gemini-2.0-flash"``.
+    """
+
+    conn_type = "pydanticai_vertex"
+    hook_name = "Pydantic AI (Google Vertex AI)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login", "host", "password"],
+            "relabeling": {},
+            "placeholders": {
+                "extra": (
+                    '{"model": "google-vertex:gemini-2.0-flash", '
+                    '"project": "my-project", "location": "us-central1", 
"vertexai": true}'
+                    "  — add service_account_file (path) or 
service_account_info (object) for SA auth;"
+                    " omit both to use Application Default Credentials"
+                ),
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        sa_file = extra.get("service_account_file")
+        sa_info = extra.get("service_account_info")
+        if sa_file and sa_info:
+            raise ValueError(
+                "Specify 'service_account_file' or 'service_account_info' in 
the connection extra, not both."
+            )
+
+        kwargs: dict[str, Any] = {}
+
+        # Direct GoogleProvider scalar kwargs.
+        for _key in ("api_key", "project", "location", "base_url"):
+            if extra.get(_key) is not None:
+                kwargs[_key] = extra[_key]
+
+        # Optional vertexai bool flag (force Vertex AI mode for API-key auth).
+        _vertexai = extra.get("vertexai")
+        if _vertexai is not None:
+            kwargs["vertexai"] = bool(_vertexai)
+
+        # Service-account credentials — loaded lazily to avoid importing
+        # google-auth on non-Vertex code paths (optional heavy dependency).
+        if sa_file:
+            from google.oauth2 import service_account  # lazy: optional dep
+
+            kwargs["credentials"] = 
service_account.Credentials.from_service_account_file(
+                sa_file,
+                scopes=["https://www.googleapis.com/auth/cloud-platform";],
+            )
+        elif sa_info:
+            from google.oauth2 import service_account  # lazy: optional dep
+
+            kwargs["credentials"] = 
service_account.Credentials.from_service_account_info(
+                sa_info,
+                scopes=["https://www.googleapis.com/auth/cloud-platform";],
+            )
+
+        return kwargs
+
+
+# ---------------------------------------------------------------------------
+# Hook registry — maps conn_type → hook class for use by for_connection()
+# ---------------------------------------------------------------------------
+_CONN_TYPE_TO_HOOK: dict[str, type[PydanticAIHook]] = {

Review Comment:
   `_CONN_TYPE_TO_HOOK` + `for_connection()` duplicates what Airflow's 
`ProvidersManager` already does. The provider manager reads `provider.yaml` and 
maps `conn_type -> hook class` automatically. The operator can just do:
   
   ```python
   @cached_property
   def llm_hook(self) -> PydanticAIHook:
       return PydanticAIHook.get_hook(conn_id=self.llm_conn_id)
   ```
   
   Or use `ProvidersManager().hooks` if you need more control. The manual 
registry will drift when someone adds a new subclass and forgets to update this 
dict.
   
   Remove `_CONN_TYPE_TO_HOOK`, `for_connection()`, and let the framework 
handle the dispatch.



##########
providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py:
##########
@@ -16,13 +16,20 @@
 # under the License.
 from __future__ import annotations
 
+import json
+import sys
 from unittest.mock import MagicMock, patch
 
 import pytest
 from pydantic_ai.models import Model
 
 from airflow.models.connection import Connection
-from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.ai.hooks.pydantic_ai import (
+    PydanticAIAzureHook,
+    PydanticAIBedrockHook,
+    PydanticAIHook,

Review Comment:
   The `test_azure_hook_uses_own_default_conn_name` test asserts 
`hook.llm_conn_id == "pydanticai_default"`. That means `PydanticAIAzureHook()` 
with no args will try to fetch a connection of type `pydanticai` (the generic 
one), not `pydanticai_azure`. The conn_type won't match.
   
   Consider giving each subclass its own `default_conn_name` (e.g. 
`pydanticai_azure_default`), or at least make the test name/comment reflect 
that this is intentional sharing, not "uses own" default.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py:
##########
@@ -75,62 +75,99 @@ def get_ui_field_behaviour() -> dict[str, Any]:
             "hidden_fields": ["schema", "port", "login"],
             "relabeling": {"password": "API Key"},
             "placeholders": {
-                "host": "https://api.openai.com/v1 (optional, for custom 
endpoints)",
+                "host": "https://api.openai.com/v1  (optional, for custom 
endpoints / Ollama)",
+                "extra": '{"model": "openai:gpt-5.3"}',
             },
         }
 
+    # ------------------------------------------------------------------
+    # Core connection / agent API
+    # ------------------------------------------------------------------
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        """
+        Return the kwargs to pass to the provider constructor.
+
+        Subclasses override this method to map their connection fields to the
+        parameters expected by their specific provider class.  The base
+        implementation handles the common ``api_key`` / ``base_url`` pattern
+        used by OpenAI, Anthropic, Groq, Mistral, Ollama, and most other
+        providers.
+
+        :param api_key: Value of ``conn.password``.
+        :param base_url: Value of ``conn.host``.
+        :param extra: Deserialized ``conn.extra`` JSON.
+        :return: Kwargs forwarded to ``provider_cls(**kwargs)``.  Empty dict
+            signals that no explicit credentials are available and the hook
+            should fall back to environment-variable–based auth.
+        """
+        kwargs: dict[str, Any] = {}
+        if api_key:
+            kwargs["api_key"] = api_key
+        if base_url:
+            kwargs["base_url"] = base_url
+        return kwargs
+
     def get_conn(self) -> Model:
         """
-        Return a configured pydantic-ai Model.
+        Return a configured pydantic-ai ``Model``.
 
-        Reads API key from connection password, base_url from connection host,
-        and model from (in priority order):
+        Resolution order:
 
-        1. ``model_id`` parameter on the hook
-        2. ``extra["model"]`` on the connection (set by the "Model" conn-field 
in the UI)
+        1. **Explicit credentials** — when :meth:`_get_provider_kwargs` returns
+           a non-empty dict the provider class is instantiated with those 
kwargs
+           and wrapped in a ``provider_factory``.
+        2. **Default resolution** — delegates to pydantic-ai ``infer_model``
+           which reads standard env vars (``OPENAI_API_KEY``, ``AWS_PROFILE``, 
…).
 
-        The result is cached for the lifetime of this hook instance.
+        The resolved model is cached for the lifetime of this hook instance.
         """
         if self._model is not None:
             return self._model
 
         conn = self.get_connection(self.llm_conn_id)
-        model_name: str | KnownModelName = self.model_id or 
conn.extra_dejson.get("model", "")
+
+        extra: dict[str, Any] = conn.extra_dejson
+        model_name: str | KnownModelName = self.model_id or extra.get("model", 
"")
         if not model_name:
             raise ValueError(
                 "No model specified. Set model_id on the hook or the Model 
field on the connection."
             )
-        api_key = conn.password
-        base_url = conn.host or None
 
-        if not api_key and not base_url:
-            # No credentials to inject — use default provider resolution
-            # (picks up env vars like OPENAI_API_KEY, AWS_PROFILE, etc.)
-            self._model = infer_model(model_name)
+        api_key: str | None = conn.password or None
+        base_url: str | None = conn.host or None
+
+        provider_kwargs = self._get_provider_kwargs(api_key, base_url, extra)
+        if provider_kwargs:
+            _kwargs = provider_kwargs  # capture for closure
+            self.log.info(
+                "Using explicit credentials for provider with model '%s': %s",
+                model_name,
+                list(provider_kwargs),
+            )
+
+            def _provider_factory(pname: str) -> Any:
+                try:
+                    return infer_provider_class(pname)(**_kwargs)
+                except TypeError as exc:

Review Comment:
   This `except TypeError` still swallows real bugs (flagged in the previous 
round). It catches both "provider doesn't accept these kwargs" and genuine 
errors like wrong types or missing required args. The warning only logs kwarg 
*names*, not the actual error.
   
   With the subclass approach, each hook maps kwargs explicitly for its 
provider. A `TypeError` from the provider constructor is almost certainly a 
real bug now, not an expected mismatch. Silently falling back to env-var auth 
hides misconfiguration from users.
   
   For the base class (OpenAI/Anthropic/Groq), `api_key` and `base_url` are 
standard params that all these providers accept. The fallback was a workaround 
for the old generic approach where you didn't know what the provider accepted. 
That's no longer the case.
   
   Remove the try/except and let the TypeError propagate. If a user 
misconfigures their connection, they should see a clear error, not a silent 
downgrade.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py:
##########
@@ -157,13 +194,266 @@ def test_connection(self) -> tuple[bool, str]:
         """
         Test connection by resolving the model.
 
-        Validates that the model string is valid, the provider package is
-        installed, and the provider class can be instantiated. Does NOT make an
-        LLM API call — that would be expensive, flaky, and fail for reasons
-        unrelated to connectivity (quotas, billing, rate limits).
+        Validates that the model string is valid and the provider class can be
+        instantiated with the supplied credentials.  Does NOT make an LLM API
+        call — that would be expensive and fail for reasons unrelated to
+        connectivity (quotas, billing, rate limits).
         """
         try:
             self.get_conn()
             return True, "Model resolved successfully."
         except Exception as e:
             return False, str(e)
+
+    @classmethod
+    def for_connection(cls, conn_id: str, model_id: str | None = None) -> 
PydanticAIHook:
+        """
+        Return the correct :class:`PydanticAIHook` subclass for *conn_id*.
+
+        Looks up the connection's ``conn_type`` in the registered hook map and
+        instantiates the matching subclass.  Falls back to
+        :class:`PydanticAIHook` for unknown types.
+
+        :param conn_id: Airflow connection ID.
+        :param model_id: Optional model override forwarded to the hook.
+        """
+        conn = cls.get_connection(conn_id)
+        hook_cls = _CONN_TYPE_TO_HOOK.get(conn.conn_type or "", cls)
+        return hook_cls(llm_conn_id=conn_id, model_id=model_id)
+
+
+class PydanticAIAzureHook(PydanticAIHook):
+    """
+    Hook for Azure OpenAI via pydantic-ai.
+
+    Connection fields:
+        - **password**: Azure API key
+        - **host**: Azure endpoint (e.g. 
``https://<resource>.openai.azure.com``)
+        - **extra** JSON::
+
+            {"model": "azure:gpt-4o", "api_version": "2024-07-01-preview"}
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. ``"azure:gpt-4o"``.
+    """
+
+    conn_type = "pydanticai_azure"
+    hook_name = "Pydantic AI (Azure OpenAI)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login"],
+            "relabeling": {"password": "API Key", "host": "Azure Endpoint"},
+            "placeholders": {
+                "host": "https://<resource>.openai.azure.com",
+                "extra": '{"model": "azure:gpt-4o", "api_version": 
"2024-07-01-preview"}',
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        kwargs: dict[str, Any] = {}
+        if api_key:
+            kwargs["api_key"] = api_key
+        if base_url:
+            kwargs["azure_endpoint"] = base_url
+        if extra.get("api_version"):
+            kwargs["api_version"] = extra["api_version"]
+        return kwargs
+
+
+class PydanticAIBedrockHook(PydanticAIHook):
+    """
+    Hook for AWS Bedrock via pydantic-ai.
+
+    Credentials are resolved in order:
+
+    1. IAM keys from ``extra`` (``aws_access_key_id`` + 
``aws_secret_access_key``,
+       optionally ``aws_session_token``).
+    2. Bearer token in ``extra`` (``api_key``, maps to env 
``AWS_BEARER_TOKEN_BEDROCK``).
+    3. Environment-variable / instance-role chain (``AWS_PROFILE``, IAM role, 
…)
+       when no explicit keys are provided.
+
+    Connection fields:
+        - **extra** JSON::
+
+            {
+              "model": "bedrock:us.anthropic.claude-opus-4-5",
+              "region_name": "us-east-1",
+              "aws_access_key_id": "AKIA...",
+              "aws_secret_access_key": "...",
+              "aws_session_token": "...",
+              "profile_name": "my-aws-profile",
+              "api_key": "bearer-token",
+              "base_url": "https://custom-bedrock-endpoint";,
+              "aws_read_timeout": 60.0,
+              "aws_connect_timeout": 10.0
+            }
+
+          Leave ``aws_access_key_id`` / ``aws_secret_access_key`` and 
``api_key``
+          empty to use the default AWS credential chain.
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. 
``"bedrock:us.anthropic.claude-opus-4-5"``.
+    """
+
+    conn_type = "pydanticai_bedrock"
+    hook_name = "Pydantic AI (AWS Bedrock)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login", "host", "password"],
+            "relabeling": {},
+            "placeholders": {
+                "extra": (
+                    '{"model": "bedrock:us.anthropic.claude-opus-4-5", '
+                    '"region_name": "us-east-1"}'
+                    "  — leave aws_access_key_id empty for IAM role / env-var 
auth"
+                ),
+            },
+        }
+
+    def _get_provider_kwargs(

Review Comment:
   Bedrock's `_get_provider_kwargs` reads `api_key` and `base_url` from `extra` 
(lines 338-340), but the base class `get_conn()` also passes `conn.password` as 
`api_key` and `conn.host` as `base_url` to this method. Those two arguments are 
silently ignored here.
   
   This works correctly since Bedrock hides `host`/`password` in the UI, but 
it's confusing for anyone reading the code. The method signature says it 
accepts `api_key` and `base_url`, then throws them away.
   
   Two options:
   1. Add a brief comment at the top: `# Bedrock reads all config from extra; 
api_key/base_url from conn.password/host are unused.`
   2. Or read `api_key`/`base_url` from the positional args too (mapping them 
to the Bedrock equivalents), so the standard connection fields still work even 
though the UI hides them.
   
   Option 1 is fine for now.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py:
##########
@@ -157,13 +194,266 @@ def test_connection(self) -> tuple[bool, str]:
         """
         Test connection by resolving the model.
 
-        Validates that the model string is valid, the provider package is
-        installed, and the provider class can be instantiated. Does NOT make an
-        LLM API call — that would be expensive, flaky, and fail for reasons
-        unrelated to connectivity (quotas, billing, rate limits).
+        Validates that the model string is valid and the provider class can be
+        instantiated with the supplied credentials.  Does NOT make an LLM API
+        call — that would be expensive and fail for reasons unrelated to
+        connectivity (quotas, billing, rate limits).
         """
         try:
             self.get_conn()
             return True, "Model resolved successfully."
         except Exception as e:
             return False, str(e)
+
+    @classmethod
+    def for_connection(cls, conn_id: str, model_id: str | None = None) -> 
PydanticAIHook:
+        """
+        Return the correct :class:`PydanticAIHook` subclass for *conn_id*.
+
+        Looks up the connection's ``conn_type`` in the registered hook map and
+        instantiates the matching subclass.  Falls back to
+        :class:`PydanticAIHook` for unknown types.
+
+        :param conn_id: Airflow connection ID.
+        :param model_id: Optional model override forwarded to the hook.
+        """
+        conn = cls.get_connection(conn_id)
+        hook_cls = _CONN_TYPE_TO_HOOK.get(conn.conn_type or "", cls)
+        return hook_cls(llm_conn_id=conn_id, model_id=model_id)
+
+
+class PydanticAIAzureHook(PydanticAIHook):
+    """
+    Hook for Azure OpenAI via pydantic-ai.
+
+    Connection fields:
+        - **password**: Azure API key
+        - **host**: Azure endpoint (e.g. 
``https://<resource>.openai.azure.com``)
+        - **extra** JSON::
+
+            {"model": "azure:gpt-4o", "api_version": "2024-07-01-preview"}
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. ``"azure:gpt-4o"``.
+    """
+
+    conn_type = "pydanticai_azure"
+    hook_name = "Pydantic AI (Azure OpenAI)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login"],
+            "relabeling": {"password": "API Key", "host": "Azure Endpoint"},
+            "placeholders": {
+                "host": "https://<resource>.openai.azure.com",
+                "extra": '{"model": "azure:gpt-4o", "api_version": 
"2024-07-01-preview"}',
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        kwargs: dict[str, Any] = {}
+        if api_key:
+            kwargs["api_key"] = api_key
+        if base_url:
+            kwargs["azure_endpoint"] = base_url
+        if extra.get("api_version"):
+            kwargs["api_version"] = extra["api_version"]
+        return kwargs
+
+
+class PydanticAIBedrockHook(PydanticAIHook):
+    """
+    Hook for AWS Bedrock via pydantic-ai.
+
+    Credentials are resolved in order:
+
+    1. IAM keys from ``extra`` (``aws_access_key_id`` + 
``aws_secret_access_key``,
+       optionally ``aws_session_token``).
+    2. Bearer token in ``extra`` (``api_key``, maps to env 
``AWS_BEARER_TOKEN_BEDROCK``).
+    3. Environment-variable / instance-role chain (``AWS_PROFILE``, IAM role, 
…)
+       when no explicit keys are provided.
+
+    Connection fields:
+        - **extra** JSON::
+
+            {
+              "model": "bedrock:us.anthropic.claude-opus-4-5",
+              "region_name": "us-east-1",
+              "aws_access_key_id": "AKIA...",
+              "aws_secret_access_key": "...",
+              "aws_session_token": "...",
+              "profile_name": "my-aws-profile",
+              "api_key": "bearer-token",
+              "base_url": "https://custom-bedrock-endpoint";,
+              "aws_read_timeout": 60.0,
+              "aws_connect_timeout": 10.0
+            }
+
+          Leave ``aws_access_key_id`` / ``aws_secret_access_key`` and 
``api_key``
+          empty to use the default AWS credential chain.
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. 
``"bedrock:us.anthropic.claude-opus-4-5"``.
+    """
+
+    conn_type = "pydanticai_bedrock"
+    hook_name = "Pydantic AI (AWS Bedrock)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login", "host", "password"],
+            "relabeling": {},
+            "placeholders": {
+                "extra": (
+                    '{"model": "bedrock:us.anthropic.claude-opus-4-5", '
+                    '"region_name": "us-east-1"}'
+                    "  — leave aws_access_key_id empty for IAM role / env-var 
auth"
+                ),
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        _str_keys = (
+            "aws_access_key_id",
+            "aws_secret_access_key",
+            "aws_session_token",
+            "region_name",
+            "profile_name",
+            # Bearer-token auth (alternative to IAM key/secret).
+            # Maps to AWS_BEARER_TOKEN_BEDROCK env var.
+            "api_key",
+            # Custom Bedrock runtime endpoint.
+            "base_url",
+        )
+        kwargs: dict[str, Any] = {k: extra[k] for k in _str_keys if 
extra.get(k) is not None}
+        # BedrockProvider expects float for timeout values; JSON integers must 
be coerced.
+        for _timeout_key in ("aws_read_timeout", "aws_connect_timeout"):
+            if extra.get(_timeout_key) is not None:
+                kwargs[_timeout_key] = float(extra[_timeout_key])
+        return kwargs
+
+
+class PydanticAIVertexHook(PydanticAIHook):
+    """
+    Hook for Google Vertex AI (or Generative Language API) via pydantic-ai.
+
+    Credentials are resolved in order:
+
+    1. ``service_account_file`` (path string) or ``service_account_info`` (JSON
+       object) in ``extra`` — loaded into a 
``google.auth.credentials.Credentials``
+       object and passed as ``credentials`` to ``GoogleProvider``.
+    2. ``api_key`` in ``extra`` — for Generative Language API (non-Vertex) or
+       Vertex API-key auth.
+    3. Application Default Credentials (``GOOGLE_APPLICATION_CREDENTIALS``,
+       ``gcloud auth application-default login``, Workload Identity, …) when
+       no explicit credentials are provided.
+
+    Connection fields:
+        - **extra** JSON::
+
+            {
+                "model": "google-vertex:gemini-2.0-flash",
+                "project": "my-gcp-project",
+                "location": "us-central1",
+                "service_account_file": "/path/to/sa.json",
+                "vertexai": true,
+            }
+
+        Use ``"service_account_info"`` instead of ``"service_account_file"`` to
+        embed the service-account JSON directly (as an object, not a string 
path).
+        Setting both at the same time raises ``ValueError``.
+
+        Set ``"vertexai": true`` to force Vertex AI mode when only ``api_key`` 
is
+        provided.  Omit ``vertexai`` for the Generative Language API (GLA).
+
+    :param llm_conn_id: Airflow connection ID.
+    :param model_id: Model identifier, e.g. 
``"google-vertex:gemini-2.0-flash"``.
+    """
+
+    conn_type = "pydanticai_vertex"
+    hook_name = "Pydantic AI (Google Vertex AI)"
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Return custom field behaviour for the Airflow connection form."""
+        return {
+            "hidden_fields": ["schema", "port", "login", "host", "password"],
+            "relabeling": {},
+            "placeholders": {
+                "extra": (
+                    '{"model": "google-vertex:gemini-2.0-flash", '
+                    '"project": "my-project", "location": "us-central1", 
"vertexai": true}'
+                    "  — add service_account_file (path) or 
service_account_info (object) for SA auth;"
+                    " omit both to use Application Default Credentials"
+                ),
+            },
+        }
+
+    def _get_provider_kwargs(
+        self,
+        api_key: str | None,
+        base_url: str | None,
+        extra: dict[str, Any],
+    ) -> dict[str, Any]:
+        sa_file = extra.get("service_account_file")
+        sa_info = extra.get("service_account_info")
+        if sa_file and sa_info:
+            raise ValueError(
+                "Specify 'service_account_file' or 'service_account_info' in 
the connection extra, not both."
+            )
+
+        kwargs: dict[str, Any] = {}
+
+        # Direct GoogleProvider scalar kwargs.
+        for _key in ("api_key", "project", "location", "base_url"):
+            if extra.get(_key) is not None:
+                kwargs[_key] = extra[_key]
+
+        # Optional vertexai bool flag (force Vertex AI mode for API-key auth).
+        _vertexai = extra.get("vertexai")
+        if _vertexai is not None:
+            kwargs["vertexai"] = bool(_vertexai)
+
+        # Service-account credentials — loaded lazily to avoid importing
+        # google-auth on non-Vertex code paths (optional heavy dependency).
+        if sa_file:
+            from google.oauth2 import service_account  # lazy: optional dep
+
+            kwargs["credentials"] = 
service_account.Credentials.from_service_account_file(

Review Comment:
   `service_account_file` takes a raw file path from connection extras and 
passes it to `Credentials.from_service_account_file()`. Anyone with 
connection-edit permissions (lower privilege than DAG deployment) can point 
this at arbitrary files on the worker filesystem.
   
   This is the same class of concern as the `importlib.import_module` issue 
from the previous round.
   
   Options:
   1. Drop `service_account_file` and only support `service_account_info` 
(inline JSON). Users embed the key content directly in the connection. No path 
traversal possible.
   2. If you keep it, add a note in the docstring that this field requires the 
same trust level as DAG deployment access.
   
   I'd go with option 1 for the initial PR. `service_account_info` covers the 
same use case without the risk.



##########
providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py:
##########
@@ -74,6 +74,139 @@ def get_provider_info():
                     }
                 },
             },

Review Comment:
   This file is auto-generated from `provider.yaml`. Manual edits here get 
overwritten when someone runs `prek run update-providers-build-files`. (Flagged 
this in the previous round too.)
   
   Your `provider.yaml` changes look correct, so just delete these manual edits 
and regenerate:
   
   ```bash
   prek run update-providers-build-files
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to