This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7fa400745ac Add GetDag endpoint to execution_api (#56955)
7fa400745ac is described below
commit 7fa400745ac7aebc7cc4ec21d3a047e9fb258310
Author: GPK <[email protected]>
AuthorDate: Wed Mar 18 20:50:26 2026 +0000
Add GetDag endpoint to execution_api (#56955)
* Add GetDagState endpoint to execution_api
* Fixup tests
* Fixup tests
* Fix static checks
* Fixup tests
* Fixup tests
* Fixup return type
* Fixup tests and mypy
* Add cadwyn migration
* Add tests
* Resolve comments round1
* Replace DAG state endpoint with DAG details endpoint
---
.../api_fastapi/execution_api/datamodels/dags.py | 35 ++++++
.../api_fastapi/execution_api/routes/__init__.py | 2 +
.../api_fastapi/execution_api/routes/dags.py | 59 ++++++++++
.../api_fastapi/execution_api/versions/__init__.py | 2 +
.../execution_api/versions/v2026_04_13.py | 28 +++++
.../execution_api/versions/head/test_dags.py | 128 +++++++++++++++++++++
.../versions/v2026_03_31/test_dags.py | 34 ++++++
.../tests/unit/dag_processing/test_processor.py | 2 +
airflow-core/tests/unit/jobs/test_triggerer_job.py | 2 +
task-sdk/src/airflow/sdk/api/client.py | 19 +++
.../src/airflow/sdk/api/datamodels/_generated.py | 17 ++-
task-sdk/src/airflow/sdk/execution_time/comms.py | 23 ++++
.../src/airflow/sdk/execution_time/supervisor.py | 7 ++
.../src/airflow/sdk/execution_time/task_runner.py | 12 ++
task-sdk/src/airflow/sdk/types.py | 4 +
task-sdk/tests/task_sdk/api/test_client.py | 88 +++++++++++++-
.../task_sdk/execution_time/test_supervisor.py | 35 +++++-
.../task_sdk/execution_time/test_task_runner.py | 25 ++++
18 files changed, 519 insertions(+), 3 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dags.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dags.py
new file mode 100644
index 00000000000..1334e99069f
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dags.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.api_fastapi.core_api.base import BaseModel
+
+
+class DagResponse(BaseModel):
+ """Schema for DAG response."""
+
+ dag_id: str
+ is_paused: bool
+ bundle_name: str | None
+ bundle_version: str | None
+ relative_fileloc: str | None
+ owners: str | None
+ tags: list[str]
+ next_dagrun: datetime | None
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
index aeef4d092b1..a076592d647 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
@@ -24,6 +24,7 @@ from airflow.api_fastapi.execution_api.routes import (
assets,
connections,
dag_runs,
+ dags,
health,
hitl,
task_instances,
@@ -43,6 +44,7 @@ authenticated_router.include_router(assets.router,
prefix="/assets", tags=["Asse
authenticated_router.include_router(asset_events.router,
prefix="/asset-events", tags=["Asset Events"])
authenticated_router.include_router(connections.router, prefix="/connections",
tags=["Connections"])
authenticated_router.include_router(dag_runs.router, prefix="/dag-runs",
tags=["Dag Runs"])
+authenticated_router.include_router(dags.router, prefix="/dags", tags=["Dags"])
authenticated_router.include_router(task_instances.router,
prefix="/task-instances", tags=["Task Instances"])
authenticated_router.include_router(
task_reschedules.router, prefix="/task-reschedules", tags=["Task
Reschedules"]
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dags.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dags.py
new file mode 100644
index 00000000000..9061b486216
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dags.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import APIRouter, HTTPException, status
+
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.execution_api.datamodels.dags import DagResponse
+from airflow.models.dag import DagModel
+
+router = APIRouter()
+
+
[email protected](
+ "/{dag_id}",
+ responses={
+ status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the
given dag_id"},
+ },
+)
+def get_dag(
+ dag_id: str,
+ session: SessionDep,
+) -> DagResponse:
+ """Get a DAG."""
+ dag_model: DagModel | None = session.get(DagModel, dag_id)
+ if not dag_model:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ detail={
+ "reason": "not_found",
+ "message": f"The Dag with dag_id: `{dag_id}` was not found",
+ },
+ )
+
+ return DagResponse(
+ dag_id=dag_model.dag_id,
+ is_paused=dag_model.is_paused,
+ bundle_name=dag_model.bundle_name,
+ bundle_version=dag_model.bundle_version,
+ relative_fileloc=dag_model.relative_fileloc,
+ owners=dag_model.owners,
+ tags=sorted(tag.name for tag in dag_model.tags),
+ next_dagrun=dag_model.next_dagrun,
+ )
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index f4f2d967e02..2cbe2e3007b 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -39,9 +39,11 @@ from airflow.api_fastapi.execution_api.versions.v2026_03_31
import (
ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField,
)
+from airflow.api_fastapi.execution_api.versions.v2026_04_13 import
AddDagEndpoint
bundle = VersionBundle(
HeadVersion(),
+ Version("2026-04-13", AddDagEndpoint),
Version(
"2026-03-31",
MakeDagRunStartDateNullable,
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py
new file mode 100644
index 00000000000..95da513d7bc
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from cadwyn import VersionChange, endpoint
+
+
+class AddDagEndpoint(VersionChange):
+ """Add the `/dags/{dag_id}` endpoint."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}",
["GET"]).didnt_exist,)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dags.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dags.py
new file mode 100644
index 00000000000..78b8f74a1d6
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dags.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from unittest.mock import ANY
+
+import pytest
+from sqlalchemy import update
+
+from airflow.models import DagModel
+from airflow.providers.standard.operators.empty import EmptyOperator
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+class TestDag:
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ @pytest.mark.parametrize(
+ ("state", "expected"),
+ [
+ pytest.param(True, True),
+ pytest.param(False, False),
+ ],
+ )
+ def test_get_dag(self, client, session, dag_maker, state, expected):
+ """Test getting a DAG."""
+
+ dag_id = "test_get_dag"
+ next_dagrun = datetime(2026, 4, 13, tzinfo=timezone.utc)
+
+ with dag_maker(dag_id=dag_id, session=session, serialized=True,
tags=["z_tag", "a_tag"]):
+ EmptyOperator(task_id="test_task")
+
+ session.execute(
+ update(DagModel)
+ .where(DagModel.dag_id == dag_id)
+ .values(
+ is_paused=state,
+ bundle_version="bundle-version",
+ relative_fileloc="dags/example.py",
+ owners="owner_1",
+ next_dagrun=next_dagrun,
+ )
+ )
+
+ session.commit()
+
+ response = client.get(
+ f"/execution/dags/{dag_id}",
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_id": dag_id,
+ "is_paused": expected,
+ "bundle_name": "dag_maker",
+ "bundle_version": "bundle-version",
+ "relative_fileloc": "dags/example.py",
+ "owners": "owner_1",
+ "tags": ["a_tag", "z_tag"],
+ "next_dagrun": "2026-04-13T00:00:00Z",
+ }
+
+ def test_dag_not_found(self, client, session, dag_maker):
+ """Test Dag not found"""
+
+ dag_id = "test_get_dag"
+
+ response = client.get(
+ f"/execution/dags/{dag_id}",
+ )
+
+ assert response.status_code == 404
+ assert response.json() == {
+ "detail": {
+ "message": "The Dag with dag_id: `test_get_dag` was not found",
+ "reason": "not_found",
+ }
+ }
+
+ def test_get_dag_defaults(self, client, session, dag_maker):
+ """Test getting a DAG with default model values."""
+
+ dag_id = "test_get_dag_defaults"
+
+ with dag_maker(dag_id=dag_id, session=session, serialized=True):
+ EmptyOperator(task_id="test_task")
+
+ session.commit()
+
+ response = client.get(
+ f"/execution/dags/{dag_id}",
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_id": dag_id,
+ "is_paused": False,
+ "bundle_name": "dag_maker",
+ "bundle_version": None,
+ "relative_fileloc": "test_dags.py",
+ "owners": "airflow",
+ "tags": [],
+ "next_dagrun": ANY,
+ }
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py
new file mode 100644
index 00000000000..72693443944
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+pytestmark = pytest.mark.db_test
+
+
[email protected]
+def old_ver_client(client):
+ client.headers["Airflow-API-Version"] = "2026-03-31"
+ return client
+
+
+def test_dag_endpoint_not_available_in_previous_version(old_ver_client):
+ response = old_ver_client.get("/execution/dags/test_dag")
+
+ assert response.status_code == 404
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 3dc57345f6f..c46aceda794 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -1911,6 +1911,7 @@ class TestDagProcessingMessageTypes:
"GetAssetEventByAssetAlias",
"GetDagRun",
"GetDagRunState",
+ "GetDag",
"GetDRCount",
"GetTaskBreadcrumbs",
"GetTaskRescheduleStartDate",
@@ -1935,6 +1936,7 @@ class TestDagProcessingMessageTypes:
in_task_runner_but_not_in_dag_processing_process = {
"AssetResult",
"AssetEventsResult",
+ "DagResult",
"DagRunResult",
"DagRunStateResult",
"DRCount",
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 3d778804279..802a34192e3 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -1317,6 +1317,7 @@ class TestTriggererMessageTypes:
"ResendLoggingFD",
"CreateHITLDetailPayload",
"SetRenderedMapIndex",
+ "GetDag",
}
in_task_but_not_in_trigger_runner = {
@@ -1336,6 +1337,7 @@ class TestTriggererMessageTypes:
"PreviousDagRunResult",
"PreviousTIResult",
"HITLDetailRequestResult",
+ "DagResult",
}
supervisor_diff = (
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index f7106bb4aa5..90374f76be5 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -45,6 +45,7 @@ from airflow.sdk.api.datamodels._generated import (
AssetEventsResponse,
AssetResponse,
ConnectionResponse,
+ DagResponse,
DagRun,
DagRunStateResponse,
DagRunType,
@@ -772,6 +773,18 @@ class DagRunOperations:
return PreviousDagRunResult(dag_run=resp.json())
+class DagsOperations:
+ __slots__ = ("client",)
+
+ def __init__(self, client: Client):
+ self.client = client
+
+ def get(self, dag_id: str) -> DagResponse:
+ """Get a DAG via the API server."""
+ resp = self.client.get(f"dags/{dag_id}")
+ return DagResponse.model_validate_json(resp.read())
+
+
class HITLOperations:
"""
Operations related to Human in the loop. Require Airflow 3.1+.
@@ -1012,6 +1025,12 @@ class Client(httpx.Client):
"""Operations related to HITL Responses."""
return HITLOperations(self)
+ @lru_cache() # type: ignore[misc]
+ @property
+ def dags(self) -> DagsOperations:
+ """Operations related to DAGs."""
+ return DagsOperations(self)
+
# This is only used for parsing. ServerResponseError is raised instead
class _ErrorBody(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 617e2a23934..b6c08e9d76c 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -27,7 +27,7 @@ from uuid import UUID
from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue,
RootModel
-API_VERSION: Final[str] = "2026-03-31"
+API_VERSION: Final[str] = "2026-04-13"
class AssetAliasReferenceAssetEventDagRun(BaseModel):
@@ -78,6 +78,21 @@ class ConnectionResponse(BaseModel):
extra: Annotated[str | None, Field(title="Extra")] = None
+class DagResponse(BaseModel):
+ """
+ Schema for DAG response.
+ """
+
+ dag_id: Annotated[str, Field(title="Dag Id")]
+ is_paused: Annotated[bool, Field(title="Is Paused")]
+ bundle_name: Annotated[str | None, Field(title="Bundle Name")] = None
+ bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None
+ relative_fileloc: Annotated[str | None, Field(title="Relative Fileloc")] =
None
+ owners: Annotated[str | None, Field(title="Owners")] = None
+ tags: Annotated[list[str], Field(title="Tags")]
+ next_dagrun: Annotated[AwareDatetime | None, Field(title="Next Dagrun")] =
None
+
+
class DagRunAssetReference(BaseModel):
"""
DagRun serializer for asset responses.
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 15755e640d9..2a9a9bbd4eb 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -71,6 +71,7 @@ from airflow.sdk.api.datamodels._generated import (
AssetResponse,
BundleInfo,
ConnectionResponse,
+ DagResponse,
DagRun,
DagRunStateResponse,
HITLDetailRequest,
@@ -694,6 +695,21 @@ class HITLDetailRequestResult(HITLDetailRequest):
return cls(**hitl_request.model_dump(exclude_defaults=True),
type="HITLDetailRequestResult")
+class DagResult(DagResponse):
+ type: Literal["DagResult"] = "DagResult"
+
+ @classmethod
+ def from_api_response(cls, dag_response: DagResponse) -> DagResult:
+ """
+ Create result class from API Response.
+
+ API Response is autogenerated from the API schema, so we need to
convert it to Result
+ for communication between the Supervisor and the task process since it
needs a
+ discriminator field.
+ """
+ return cls(**dag_response.model_dump(exclude_defaults=True),
type="DagResult")
+
+
ToTask = Annotated[
AssetResult
| AssetEventsResult
@@ -701,6 +717,7 @@ ToTask = Annotated[
| DagRunResult
| DagRunStateResult
| DRCount
+ | DagResult
| ErrorResponse
| PrevSuccessfulDagRunResult
| PreviousTIResult
@@ -1018,6 +1035,11 @@ class MaskSecret(BaseModel):
type: Literal["MaskSecret"] = "MaskSecret"
+class GetDag(BaseModel):
+ dag_id: str
+ type: Literal["GetDag"] = "GetDag"
+
+
ToSupervisor = Annotated[
DeferTask
| DeleteXCom
@@ -1029,6 +1051,7 @@ ToSupervisor = Annotated[
| GetDagRun
| GetDagRunState
| GetDRCount
+ | GetDag
| GetPrevSuccessfulDagRun
| GetPreviousDagRun
| GetPreviousTI
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 9894d4fff31..1dfefee5404 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -66,6 +66,7 @@ from airflow.sdk.execution_time.comms import (
AssetResult,
ConnectionResult,
CreateHITLDetailPayload,
+ DagResult,
DagRunResult,
DagRunStateResult,
DeferTask,
@@ -77,6 +78,7 @@ from airflow.sdk.execution_time.comms import (
GetAssetEventByAsset,
GetAssetEventByAssetAlias,
GetConnection,
+ GetDag,
GetDagRun,
GetDagRunState,
GetDRCount,
@@ -1477,6 +1479,11 @@ class ActivitySubprocess(WatchedSubprocess):
dump_opts = {"exclude_unset": True}
elif isinstance(msg, MaskSecret):
mask_secret(msg.value, msg.name)
+ elif isinstance(msg, GetDag):
+ dag = self.client.dags.get(
+ dag_id=msg.dag_id,
+ )
+ resp = DagResult.from_api_response(dag)
else:
log.error("Unhandled request", msg=msg)
self.send_msg(
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 611c4fc28ec..aa5a5a08ad4 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -73,10 +73,12 @@ from airflow.sdk.execution_time.callback_runner import
create_executable_runner
from airflow.sdk.execution_time.comms import (
AssetEventDagRunReferenceResult,
CommsDecoder,
+ DagResult,
DagRunStateResult,
DeferTask,
DRCount,
ErrorResponse,
+ GetDag,
GetDagRunState,
GetDRCount,
GetPreviousDagRun,
@@ -670,6 +672,16 @@ class RuntimeTaskInstance(TaskInstance):
return response.state
+ @staticmethod
+ def get_dag(dag_id: str) -> DagResult:
+ """Return the DAG with the given dag_id."""
+ response = SUPERVISOR_COMMS.send(msg=GetDag(dag_id=dag_id))
+
+ if TYPE_CHECKING:
+ assert isinstance(response, DagResult)
+
+ return response
+
@property
def log_url(self) -> str:
run_id = quote(self.run_id)
diff --git a/task-sdk/src/airflow/sdk/types.py
b/task-sdk/src/airflow/sdk/types.py
index 71c24474806..4ce43c57c11 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -39,6 +39,7 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.asset import Asset, AssetAlias,
AssetAliasEvent, AssetRef, BaseAssetUniqueKey
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.mappedoperator import MappedOperator
+ from airflow.sdk.execution_time.comms import DagResult
Operator: TypeAlias = BaseOperator | MappedOperator
@@ -183,6 +184,9 @@ class RuntimeTaskInstanceProtocol(Protocol):
@staticmethod
def get_dagrun_state(dag_id: str, run_id: str) -> str: ...
+ @staticmethod
+ def get_dag(dag_id: str) -> DagResult: ...
+
# Public alias for RuntimeTaskInstanceProtocol
class TaskInstance(RuntimeTaskInstanceProtocol):
diff --git a/task-sdk/tests/task_sdk/api/test_client.py
b/task-sdk/tests/task_sdk/api/test_client.py
index 6347d611a6a..7d960b76570 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import json
import pickle
-from datetime import datetime
+from datetime import datetime, timezone as dt_timezone
from typing import TYPE_CHECKING
from unittest import mock
@@ -37,6 +37,7 @@ from airflow.sdk.api.datamodels._generated import (
AssetEventsResponse,
AssetResponse,
ConnectionResponse,
+ DagResponse,
DagRunState,
DagRunStateResponse,
HITLDetailRequest,
@@ -1566,3 +1567,88 @@ class TestSSLContextCaching:
assert ctx1 is not ctx2
assert info.misses == 2
assert info.currsize == 2
+
+
+class TestDagsOperations:
+ def test_get(self):
+ """Test that the client can get a dag."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/dags/test_dag":
+ return httpx.Response(
+ status_code=200,
+ json={
+ "dag_id": "test_dag",
+ "is_paused": False,
+ "bundle_name": "dags-folder",
+ "bundle_version": "bundle-version",
+ "relative_fileloc": "dags/example.py",
+ "owners": "owner_1",
+ "tags": ["a_tag", "z_tag"],
+ "next_dagrun": "2026-04-13T00:00:00Z",
+ },
+ )
+ return httpx.Response(status_code=200)
+
+ client = make_client(transport=httpx.MockTransport(handle_request))
+ result = client.dags.get(dag_id="test_dag")
+
+ assert result == DagResponse(
+ dag_id="test_dag",
+ is_paused=False,
+ bundle_name="dags-folder",
+ bundle_version="bundle-version",
+ relative_fileloc="dags/example.py",
+ owners="owner_1",
+ tags=["a_tag", "z_tag"],
+ next_dagrun=datetime(2026, 4, 13, tzinfo=dt_timezone.utc),
+ )
+
+ def test_get_not_found(self):
+ """Test that getting a missing dag raises a server response error."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/dags/missing_dag":
+ return httpx.Response(
+ status_code=404,
+ json={
+ "detail": {
+ "message": "The Dag with dag_id: `missing_dag` was
not found",
+ "reason": "not_found",
+ }
+ },
+ )
+ return httpx.Response(status_code=200)
+
+ client = make_client(transport=httpx.MockTransport(handle_request))
+
+ with pytest.raises(ServerResponseError) as exc_info:
+ client.dags.get(dag_id="missing_dag")
+
+ assert exc_info.value.response.status_code == 404
+ assert exc_info.value.detail == {
+ "detail": {
+ "message": "The Dag with dag_id: `missing_dag` was not found",
+ "reason": "not_found",
+ }
+ }
+
+ def test_get_server_error(self):
+ """Test that a server error while getting a dag."""
+
+ def handle_request(request: httpx.Request) -> httpx.Response:
+ if request.url.path == "/dags/test_dag":
+ return httpx.Response(
+ status_code=500,
+ headers=[("content-Type", "application/json")],
+ json={
+ "reason": "internal_server_error",
+ "message": "Internal Server Error",
+ },
+ )
+ return httpx.Response(status_code=200)
+
+ client = make_client(transport=httpx.MockTransport(handle_request))
+
+ with pytest.raises(ServerResponseError):
+ client.dags.get(dag_id="test_dag")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 66eda6f5b87..ef6cd19b8d7 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -30,7 +30,7 @@ import sys
import time
from contextlib import nullcontext
from dataclasses import dataclass, field
-from datetime import datetime
+from datetime import datetime, timezone as dt_timezone
from operator import attrgetter
from random import randint
from textwrap import dedent
@@ -71,6 +71,7 @@ from airflow.sdk.execution_time.comms import (
CommsDecoder,
ConnectionResult,
CreateHITLDetailPayload,
+ DagResult,
DagRunResult,
DagRunStateResult,
DeferTask,
@@ -83,6 +84,7 @@ from airflow.sdk.execution_time.comms import (
GetAssetEventByAsset,
GetAssetEventByAssetAlias,
GetConnection,
+ GetDag,
GetDagRun,
GetDagRunState,
GetDRCount,
@@ -2499,6 +2501,37 @@ REQUEST_TEST_CASES = [
},
test_id="get_task_breadcrumbs",
),
+ RequestTestCase(
+ message=GetDag(dag_id="test_dag"),
+ expected_body={
+ "dag_id": "test_dag",
+ "is_paused": False,
+ "bundle_name": "dags-folder",
+ "bundle_version": "bundle-version",
+ "relative_fileloc": "dags/example.py",
+ "owners": "owner_1",
+ "tags": ["a_tag", "z_tag"],
+ "next_dagrun": datetime(2026, 4, 13, tzinfo=dt_timezone.utc),
+ "type": "DagResult",
+ },
+ client_mock=ClientMock(
+ method_path="dags.get",
+ kwargs={
+ "dag_id": "test_dag",
+ },
+ response=DagResult(
+ dag_id="test_dag",
+ is_paused=False,
+ bundle_name="dags-folder",
+ bundle_version="bundle-version",
+ relative_fileloc="dags/example.py",
+ owners="owner_1",
+ tags=["a_tag", "z_tag"],
+ next_dagrun=datetime(2026, 4, 13, tzinfo=dt_timezone.utc),
+ ),
+ ),
+ test_id="get_dag",
+ ),
]
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 47de56c384f..0eab6a50afc 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -80,11 +80,13 @@ from airflow.sdk.execution_time.comms import (
AssetEventsResult,
BundleInfo,
ConnectionResult,
+ DagResult,
DagRunStateResult,
DeferTask,
DRCount,
ErrorResponse,
GetConnection,
+ GetDag,
GetDagRunState,
GetDRCount,
GetPreviousDagRun,
@@ -2942,6 +2944,29 @@ class TestRuntimeTaskInstance:
if hasattr(call.kwargs.get("msg"), "rendered_fields")
)
+ def test_get_dag(self, mock_supervisor_comms):
+ """Test that get_dag sends the correct request and returns the dag."""
+ mock_supervisor_comms.send.return_value = DagResult(
+ dag_id="test_dag",
+ is_paused=False,
+ bundle_name="dags-folder",
+ bundle_version="bundle-version",
+ relative_fileloc="dags/example.py",
+ owners="owner_1",
+ tags=["a_tag", "z_tag"],
+ next_dagrun=datetime(2026, 4, 13, tzinfo=dt_timezone.utc),
+ )
+
+ response = RuntimeTaskInstance.get_dag(
+ dag_id="test_dag",
+ )
+
+ mock_supervisor_comms.send.assert_called_once_with(
+ msg=GetDag(dag_id="test_dag"),
+ )
+ assert response.dag_id == "test_dag"
+ assert response.is_paused is False
+
class TestXComAfterTaskExecution:
@pytest.mark.parametrize(