This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c9c32710e Migrate the public endpoint Delete DAG to FastAPI (#42914)
6c9c32710e is described below

commit 6c9c32710e1ebbfb244216763239169e5c972d02
Author: Omkar P <45419097+omkar-f...@users.noreply.github.com>
AuthorDate: Tue Oct 15 14:15:51 2024 +0530

    Migrate the public endpoint Delete DAG to FastAPI (#42914)
    
    * Migrate the public endpoint Delete DAG to FastAPI
    
    * Refactor tests
---
 airflow/api_connexion/endpoints/dag_endpoint.py |   2 +
 airflow/api_fastapi/openapi/v1-generated.yaml   |  49 +++++
 airflow/api_fastapi/views/public/dags.py        |  19 +-
 airflow/ui/openapi-gen/queries/common.ts        |   3 +
 airflow/ui/openapi-gen/queries/queries.ts       |  37 ++++
 airflow/ui/openapi-gen/requests/services.gen.ts |  29 +++
 airflow/ui/openapi-gen/requests/types.gen.ts    |  35 ++++
 airflow/utils/api_migration.py                  |   2 +-
 tests/api_fastapi/views/public/test_dags.py     | 242 ++++++++++++++++--------
 9 files changed, 336 insertions(+), 82 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py 
b/airflow/api_connexion/endpoints/dag_endpoint.py
index 3d0d3dd8bf..0352297bff 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -52,6 +52,7 @@ if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse, UpdateMask
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET")
 @provide_session
 def get_dag(
@@ -215,6 +216,7 @@ def patch_dags(limit, session, offset=0, only_active=True, 
tags=None, dag_id_pat
     return dags_collection_schema.dump(DAGCollection(dags=dags, 
total_entries=total_entries))
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("DELETE")
 @action_logging
 @provide_session
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml 
b/airflow/api_fastapi/openapi/v1-generated.yaml
index 235410a6d3..56f48c73e9 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -408,6 +408,55 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - DAG
+      summary: Delete Dag
+      description: Delete the specific DAG.
+      operationId: delete_dag
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema: {}
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unprocessable Entity
   /public/dags/{dag_id}/details:
     get:
       tags:
diff --git a/airflow/api_fastapi/views/public/dags.py 
b/airflow/api_fastapi/views/public/dags.py
index ca0f44162e..eb8233a7f7 100644
--- a/airflow/api_fastapi/views/public/dags.py
+++ b/airflow/api_fastapi/views/public/dags.py
@@ -17,11 +17,12 @@
 
 from __future__ import annotations
 
-from fastapi import Depends, HTTPException, Query, Request
+from fastapi import Depends, HTTPException, Query, Request, Response
 from sqlalchemy import update
 from sqlalchemy.orm import Session
 from typing_extensions import Annotated
 
+from airflow.api.common import delete_dag as delete_dag_module
 from airflow.api_fastapi.db.common import (
     get_session,
     paginated_select,
@@ -48,6 +49,7 @@ from airflow.api_fastapi.serializers.dags import (
     DAGResponse,
 )
 from airflow.api_fastapi.views.router import AirflowRouter
+from airflow.exceptions import AirflowException, DagNotFound
 from airflow.models import DAG, DagModel
 
 dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
@@ -204,3 +206,18 @@ async def patch_dags(
         dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in 
dags],
         total_entries=total_entries,
     )
+
+
+@dags_router.delete("/{dag_id}", 
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
+async def delete_dag(
+    dag_id: str,
+    session: Annotated[Session, Depends(get_session)],
+) -> Response:
+    """Delete the specific DAG."""
+    try:
+        delete_dag_module.delete_dag(dag_id, session=session)
+    except DagNotFound:
+        raise HTTPException(404, f"Dag with id: {dag_id} was not found")
+    except AirflowException:
+        raise HTTPException(409, f"Task instances of dag with id: '{dag_id}' 
are still running")
+    return Response(status_code=204)
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 426e28447f..2f1c6a78d9 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -197,6 +197,9 @@ export type DagServicePatchDagMutationResult = Awaited<
 export type VariableServicePatchVariableMutationResult = Awaited<
   ReturnType<typeof VariableService.patchVariable>
 >;
+export type DagServiceDeleteDagMutationResult = Awaited<
+  ReturnType<typeof DagService.deleteDag>
+>;
 export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
   ReturnType<typeof ConnectionService.deleteConnection>
 >;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 557a7ba8ff..a16bdf165b 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -514,6 +514,43 @@ export const useVariableServicePatchVariable = <
       }) as unknown as Promise<TData>,
     ...options,
   });
+/**
+ * Delete Dag
+ * Delete the specific DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useDagServiceDeleteDag = <
+  TData = Common.DagServiceDeleteDagMutationResult,
+  TError = unknown,
+  TContext = unknown,
+>(
+  options?: Omit<
+    UseMutationOptions<
+      TData,
+      TError,
+      {
+        dagId: string;
+      },
+      TContext
+    >,
+    "mutationFn"
+  >,
+) =>
+  useMutation<
+    TData,
+    TError,
+    {
+      dagId: string;
+    },
+    TContext
+  >({
+    mutationFn: ({ dagId }) =>
+      DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
+    ...options,
+  });
 /**
  * Delete Connection
  * Delete a connection entry.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 78b113c7f2..8d7f0cee2b 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,8 @@ import type {
   GetDagResponse,
   PatchDagData,
   PatchDagResponse,
+  DeleteDagData,
+  DeleteDagResponse,
   GetDagDetailsData,
   GetDagDetailsResponse,
   DeleteConnectionData,
@@ -234,6 +236,33 @@ export class DagService {
     });
   }
 
+  /**
+   * Delete Dag
+   * Delete the specific DAG.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @returns unknown Successful Response
+   * @throws ApiError
+   */
+  public static deleteDag(
+    data: DeleteDagData,
+  ): CancelablePromise<DeleteDagResponse> {
+    return __request(OpenAPI, {
+      method: "DELETE",
+      url: "/public/dags/{dag_id}",
+      path: {
+        dag_id: data.dagId,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Unprocessable Entity",
+      },
+    });
+  }
+
   /**
    * Get Dag Details
    * Get details of DAG.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 856517d560..7f603a1adb 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -327,6 +327,12 @@ export type PatchDagData = {
 
 export type PatchDagResponse = DAGResponse;
 
+export type DeleteDagData = {
+  dagId: string;
+};
+
+export type DeleteDagResponse = unknown;
+
 export type GetDagDetailsData = {
   dagId: string;
 };
@@ -525,6 +531,35 @@ export type $OpenApiTs = {
         422: HTTPValidationError;
       };
     };
+    delete: {
+      req: DeleteDagData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: unknown;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Unprocessable Entity
+         */
+        422: HTTPExceptionResponse;
+      };
+    };
   };
   "/public/dags/{dag_id}/details": {
     get: {
diff --git a/airflow/utils/api_migration.py b/airflow/utils/api_migration.py
index d6b61a933d..3e6ba3881c 100644
--- a/airflow/utils/api_migration.py
+++ b/airflow/utils/api_migration.py
@@ -31,7 +31,7 @@ PS = ParamSpec("PS")
 RT = TypeVar("RT")
 
 
-def mark_fastapi_migration_done(function: Callable[PS, RT]) -> Callable[PS, 
RT]:
+def mark_fastapi_migration_done(function: Callable[..., RT]) -> Callable[..., 
RT]:
     """
     Mark an endpoint as migrated over to the new FastAPI API.
 
diff --git a/tests/api_fastapi/views/public/test_dags.py 
b/tests/api_fastapi/views/public/test_dags.py
index cd1809cb70..ab0c54f517 100644
--- a/tests/api_fastapi/views/public/test_dags.py
+++ b/tests/api_fastapi/views/public/test_dags.py
@@ -25,7 +25,7 @@ from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.session import provide_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags
 
@@ -36,84 +36,100 @@ DAG1_DISPLAY_NAME = "display1"
 DAG2_ID = "test_dag2"
 DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc)
 DAG3_ID = "test_dag3"
+DAG4_ID = "test_dag4"
+DAG4_DISPLAY_NAME = "display4"
+DAG5_ID = "test_dag5"
+DAG5_DISPLAY_NAME = "display5"
 TASK_ID = "op1"
 UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else 
"Timezone('UTC')"
+API_PREFIX = "/public/dags"
+
+
+class TestDagEndpoint:
+    """Common class for /public/dags related unit tests."""
+
+    @staticmethod
+    def _clear_db():
+        clear_db_runs()
+        clear_db_dags()
+        clear_db_serialized_dags()
+
+    def _create_deactivated_paused_dag(self, session=None):
+        dag_model = DagModel(
+            dag_id=DAG3_ID,
+            fileloc="/tmp/dag_del_1.py",
+            timetable_summary="2 2 * * *",
+            is_active=False,
+            is_paused=True,
+            owners="test_owner,another_test_owner",
+            next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+        )
+
+        dagrun_failed = DagRun(
+            dag_id=DAG3_ID,
+            run_id="run1",
+            execution_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+            start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+            run_type=DagRunType.SCHEDULED,
+            state=DagRunState.FAILED,
+        )
+
+        dagrun_success = DagRun(
+            dag_id=DAG3_ID,
+            run_id="run2",
+            execution_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+            start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
+            run_type=DagRunType.MANUAL,
+            state=DagRunState.SUCCESS,
+        )
+
+        session.add(dag_model)
+        session.add(dagrun_failed)
+        session.add(dagrun_success)
+
+    @pytest.fixture(autouse=True)
+    @provide_session
+    def setup(self, dag_maker, session=None) -> None:
+        self._clear_db()
+
+        with dag_maker(
+            DAG1_ID,
+            dag_display_name=DAG1_DISPLAY_NAME,
+            schedule=None,
+            start_date=datetime(2018, 6, 15, 0, 0, tzinfo=timezone.utc),
+            doc_md="details",
+            params={"foo": 1},
+            tags=["example"],
+        ):
+            EmptyOperator(task_id=TASK_ID)
+
+        dag_maker.create_dagrun(state=DagRunState.FAILED)
+
+        with dag_maker(
+            DAG2_ID,
+            schedule=None,
+            start_date=DAG2_START_DATE,
+            doc_md="details",
+            params={"foo": 1},
+            max_active_tasks=16,
+            max_active_runs=16,
+        ):
+            EmptyOperator(task_id=TASK_ID)
+
+        self._create_deactivated_paused_dag(session)
+
+        dag_maker.dagbag.sync_to_db()
+        dag_maker.dag_model.has_task_concurrency_limits = True
+        session.merge(dag_maker.dag_model)
+        session.commit()
+
+    def teardown_method(self) -> None:
+        self._clear_db()
+
+
+class TestGetDags(TestDagEndpoint):
+    """Unit tests for Get DAGs."""
 
-
-@provide_session
-def _create_deactivated_paused_dag(session=None):
-    dag_model = DagModel(
-        dag_id=DAG3_ID,
-        fileloc="/tmp/dag_del_1.py",
-        timetable_summary="2 2 * * *",
-        is_active=False,
-        is_paused=True,
-        owners="test_owner,another_test_owner",
-        next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-    )
-
-    dagrun_failed = DagRun(
-        dag_id=DAG3_ID,
-        run_id="run1",
-        execution_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-        start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-        run_type=DagRunType.SCHEDULED,
-        state=DagRunState.FAILED,
-    )
-
-    dagrun_success = DagRun(
-        dag_id=DAG3_ID,
-        run_id="run2",
-        execution_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-        start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
-        run_type=DagRunType.MANUAL,
-        state=DagRunState.SUCCESS,
-    )
-
-    session.add(dag_model)
-    session.add(dagrun_failed)
-    session.add(dagrun_success)
-
-
-@pytest.fixture(autouse=True)
-@provide_session
-def setup(dag_maker, session=None) -> None:
-    clear_db_runs()
-    clear_db_dags()
-    clear_db_serialized_dags()
-
-    with dag_maker(
-        DAG1_ID,
-        dag_display_name=DAG1_DISPLAY_NAME,
-        schedule=None,
-        start_date=datetime(2018, 6, 15, 0, 0, tzinfo=timezone.utc),
-        doc_md="details",
-        params={"foo": 1},
-        tags=["example"],
-    ):
-        EmptyOperator(task_id=TASK_ID)
-
-    dag_maker.create_dagrun(state=DagRunState.FAILED)
-
-    with dag_maker(
-        DAG2_ID,
-        schedule=None,
-        start_date=DAG2_START_DATE,
-        doc_md="details",
-        params={"foo": 1},
-        max_active_tasks=16,
-        max_active_runs=16,
-    ):
-        EmptyOperator(task_id=TASK_ID)
-
-    dag_maker.dagbag.sync_to_db()
-    dag_maker.dag_model.has_task_concurrency_limits = True
-    session.merge(dag_maker.dag_model)
-    session.commit()
-    _create_deactivated_paused_dag()
-
-
-class TestGetDags:
     @pytest.mark.parametrize(
         "query_params, expected_total_entries, expected_ids",
         [
@@ -161,7 +177,9 @@ class TestGetDags:
         assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
 
 
-class TestPatchDag:
+class TestPatchDag(TestDagEndpoint):
+    """Unit tests for Patch DAG."""
+
     @pytest.mark.parametrize(
         "query_params, dag_id, body, expected_status_code, expected_is_paused",
         [
@@ -184,7 +202,9 @@ class TestPatchDag:
             assert body["is_paused"] == expected_is_paused
 
 
-class TestPatchDags:
+class TestPatchDags(TestDagEndpoint):
+    """Unit tests for Patch DAGs."""
+
     @pytest.mark.parametrize(
         "query_params, body, expected_status_code, expected_ids, 
expected_paused_ids",
         [
@@ -239,7 +259,9 @@ class TestPatchDags:
             assert paused_dag_ids == expected_paused_ids
 
 
-class TestDagDetails:
+class TestDagDetails(TestDagEndpoint):
+    """Unit tests for DAG Details."""
+
     @pytest.mark.parametrize(
         "query_params, dag_id, expected_status_code, dag_display_name, 
start_date",
         [
@@ -312,7 +334,9 @@ class TestDagDetails:
         assert res_json == expected
 
 
-class TestGetDag:
+class TestGetDag(TestDagEndpoint):
+    """Unit tests for Get DAG."""
+
     @pytest.mark.parametrize(
         "query_params, dag_id, expected_status_code, dag_display_name",
         [
@@ -359,3 +383,61 @@ class TestGetDag:
             "pickle_id": None,
         }
         assert res_json == expected
+
+
+class TestDeleteDAG(TestDagEndpoint):
+    """Unit tests for Delete DAG."""
+
+    def _create_dag_for_deletion(
+        self,
+        dag_maker,
+        dag_id=None,
+        dag_display_name=None,
+        has_running_dagruns=False,
+    ):
+        with dag_maker(
+            dag_id,
+            dag_display_name=dag_display_name,
+            start_date=datetime(2024, 10, 10, tzinfo=timezone.utc),
+        ):
+            EmptyOperator(task_id="dummy")
+
+        if has_running_dagruns:
+            dr = dag_maker.create_dagrun()
+            ti = dr.get_task_instances()[0]
+            ti.set_state(TaskInstanceState.RUNNING)
+
+        dag_maker.dagbag.sync_to_db()
+
+    @pytest.mark.parametrize(
+        "dag_id, dag_display_name, status_code_delete, status_code_details, 
has_running_dagruns, is_create_dag",
+        [
+            ("test_nonexistent_dag_id", "nonexistent_display_name", 404, 404, 
False, False),
+            (DAG4_ID, DAG4_DISPLAY_NAME, 204, 404, False, True),
+            (DAG5_ID, DAG5_DISPLAY_NAME, 409, 200, True, True),
+        ],
+    )
+    def test_delete_dag(
+        self,
+        dag_maker,
+        test_client,
+        dag_id,
+        dag_display_name,
+        status_code_delete,
+        status_code_details,
+        has_running_dagruns,
+        is_create_dag,
+    ):
+        if is_create_dag:
+            self._create_dag_for_deletion(
+                dag_maker=dag_maker,
+                dag_id=dag_id,
+                dag_display_name=dag_display_name,
+                has_running_dagruns=has_running_dagruns,
+            )
+
+        delete_response = test_client.delete(f"{API_PREFIX}/{dag_id}")
+        assert delete_response.status_code == status_code_delete
+
+        details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details")
+        assert details_response.status_code == status_code_details

Reply via email to