kaxil commented on code in PR #64433:
URL: https://github.com/apache/airflow/pull/64433#discussion_r3019320011


##########
providers/common/ai/src/airflow/providers/common/ai/toolsets/sql.py:
##########
@@ -147,6 +148,40 @@ def __init__(
     def id(self) -> str:
         return f"sql-{self._db_conn_id}"
 
+    def describe_policy_exposure(self) -> ToolsetExposure:
+        resources = [
+            ResourceExposure(
+                category="database",
+                name=self._db_conn_id,
+                access_mode="read_write" if self._allow_writes else "read",
+                details={"schema": self._schema} if self._schema else {},
+            )
+        ]
+        for table_name in sorted(self._allowed_tables or ()):
+            resources.append(ResourceExposure(category="table", 
name=table_name, access_mode="read"))

Review Comment:
   Per-table resources are always `access_mode="read"`, even when 
`self._allow_writes` is `True`. The database-level entry correctly says 
`"read_write"`, but the individual table entries are inconsistent. Should this 
be:
   ```python
   access_mode="read_write" if self._allow_writes else "read"
   ```



##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -215,7 +224,59 @@ def _build_durable_toolsets(
 
         return [CachingToolset(wrapped=ts, storage=storage, counter=counter) 
for ts in toolsets]
 
+    def _build_policy_exposure_report(self, context: Context) -> 
PolicyExposureReport:
+        """Build a configured policy exposure snapshot from rendered operator 
configuration."""
+        ti = context["task_instance"]
+        llm_exposure = LLMExposure(
+            llm_conn_id=self.llm_conn_id,
+            connection_type=self.llm_hook.conn_type,

Review Comment:
   `self.llm_hook.conn_type` triggers the `@cached_property`, which resolves 
the Airflow connection and instantiates the hook. If the connection is 
misconfigured, the outer `try/except` swallows the error and the task then 
fails inside `_build_agent()` with a different traceback. The user sees both a 
"Failed to build policy exposure report" warning *and* the actual connection 
error, which can be confusing.
   
   Consider wrapping just the `conn_type` access:
   ```python
   try:
       connection_type = self.llm_hook.conn_type
   except Exception:
       connection_type = None
   ```
   Or just use `self.llm_conn_id` here and skip resolving the connection type 
in the report.



##########
providers/common/ai/src/airflow/providers/common/ai/toolsets/hook.py:
##########
@@ -265,3 +297,11 @@ def _serialize_for_llm(value: Any) -> str:
         return json.dumps(value, default=str)
     except (TypeError, ValueError):
         return str(value)
+
+
+def _looks_mutating(method_name: str) -> bool:
+    """Return True when a hook method name suggests write-like side effects."""
+    return any(
+        token in method_name.lower()
+        for token in ("create", "delete", "drop", "update", "insert", "write", 
"post", "put", "patch")

Review Comment:
   This misses some common mutating patterns: `remove`, `send`, `execute`, 
`upload`, `publish`. For example, `S3Hook.delete_objects` would match, but 
`S3Hook.upload_file` or `SlackHook.send_message` would not.



##########
providers/common/ai/src/airflow/providers/common/ai/utils/policy_exposure.py:
##########
@@ -0,0 +1,204 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared models and helpers for configured policy exposure snapshots."""
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Any, Literal
+
+from pydantic import BaseModel, Field
+from pydantic_ai.toolsets.wrapper import WrapperToolset
+
+XCOM_POLICY_EXPOSURE = "airflow_common_ai_policy_exposure"
+
+PolicyRiskLevel = Literal["low", "medium", "high"]
+ResourceCategory = Literal[
+    "database",
+    "table",
+    "schema",
+    "datasource",
+    "uri",
+    "hook_method",
+    "mcp_server",
+    "tool_prefix",
+    "unknown",
+]
+AccessMode = Literal["read", "write", "read_write", "unknown"]
+
+
+class TaskIdentity(BaseModel):
+    """Identifying information for the task instance that produced the 
report."""
+
+    dag_id: str
+    run_id: str
+    task_id: str
+    map_index: int = -1
+    operator_type: str
+
+
+class LLMExposure(BaseModel):
+    """Configuration-derived LLM access surface."""
+
+    llm_conn_id: str
+    connection_type: str | None = None
+    model_id: str | None = None
+
+
+class ApprovalExposure(BaseModel):
+    """Review and approval controls applied to the task."""
+
+    enable_hitl_review: bool = False
+    max_hitl_iterations: int | None = None
+
+
+class ResourceExposure(BaseModel):
+    """A single configured resource or capability exposed to the agent."""
+
+    category: ResourceCategory
+    name: str
+    access_mode: AccessMode = "unknown"
+    details: dict[str, Any] = Field(default_factory=dict)
+
+
+class ToolsetExposure(BaseModel):
+    """Exposure summary for one configured toolset."""
+
+    toolset_type: str
+    toolset_id: str | None = None
+    summary: str
+    resources: list[ResourceExposure] = Field(default_factory=list)
+    risk_flags: list[str] = Field(default_factory=list)
+
+
+class PolicyRiskSummary(BaseModel):
+    """High-level risk summary for the configured exposure snapshot."""
+
+    level: PolicyRiskLevel
+    reasons: list[str] = Field(default_factory=list)
+
+
+class PolicyExposureReport(BaseModel):
+    """Configured policy exposure snapshot for an AI task instance."""
+
+    captured_at: datetime = Field(default_factory=lambda: 
datetime.now(timezone.utc))
+    task: TaskIdentity
+    llm: LLMExposure
+    approval: ApprovalExposure
+    toolsets: list[ToolsetExposure] = Field(default_factory=list)
+    runtime_notes: list[str] = Field(default_factory=list)
+    risk: PolicyRiskSummary
+
+
+def classify_policy_risk(
+    *, llm: LLMExposure, toolsets: list[ToolsetExposure], runtime_notes: 
list[str]
+) -> PolicyRiskSummary:
+    """Classify overall risk using deterministic rules based on configured 
access."""
+    reasons: list[str] = []
+    has_write_access = any(
+        resource.access_mode in {"write", "read_write"}
+        for toolset in toolsets
+        for resource in toolset.resources
+    )
+
+    for toolset in toolsets:
+        reasons.extend(toolset.risk_flags)
+
+    if has_write_access and not any(
+        "write-capable" in reason or "write access" in reason for reason in 
reasons
+    ):
+        reasons.append("write-capable tool access configured")
+
+    deduped_reasons = _dedupe_reasons(reasons)
+
+    if any(
+        reason in deduped_reasons
+        for reason in ("unknown toolset exposure", "potentially mutating hook 
methods exposed")
+    ):
+        return PolicyRiskSummary(level="high", reasons=deduped_reasons or 
["unknown toolset exposure"])
+
+    if has_write_access:
+        return PolicyRiskSummary(level="high", reasons=deduped_reasons)
+
+    if deduped_reasons:
+        return PolicyRiskSummary(level="medium", reasons=deduped_reasons)
+
+    if runtime_notes:
+        return PolicyRiskSummary(level="low", reasons=["configured access 
includes runtime controls"])

Review Comment:
   The reason `"configured access includes runtime controls"` fires when 
`runtime_notes` has entries like `"tool logging enabled"` or `"durable replay 
enabled"`. These are observability features, not governance controls. Something 
like `"no external tool access configured"` (same as the no-runtime-notes 
branch) might be more accurate here, since the risk level is `"low"` either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to