This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c3bc91f42e Follow-up SLA purge (#42808)
6c3bc91f42e is described below

commit 6c3bc91f42e3617824aef5127d37b2b8280babe6
Author: D. Ferruzzi <ferru...@amazon.com>
AuthorDate: Tue Oct 15 16:53:11 2024 -0700

    Follow-up SLA purge (#42808)
    
    Co-authored-by: dstandish <dstand...@users.noreply.github.com>
---
 .../endpoints/task_instance_endpoint.py            |  96 +++--------------
 airflow/api_connexion/schemas/sla_miss_schema.py   |  38 -------
 .../api_connexion/schemas/task_instance_schema.py  |  21 +---
 airflow/example_dags/tutorial.py                   |   1 -
 airflow/models/__init__.py                         |   3 -
 airflow/models/slamiss.py                          |  46 ---------
 airflow/notifications/basenotifier.py              |   8 +-
 airflow/utils/context.py                           |   3 +
 airflow/www/auth.py                                |   4 +-
 airflow/www/extensions/init_views.py               |   3 -
 airflow/www/views.py                               | 115 +--------------------
 .../api_endpoints/test_task_instance_endpoint.py   |   3 +-
 providers/tests/smtp/notifications/test_smtp.py    |  33 ------
 .../test_mapped_task_instance_endpoint.py          |   4 +-
 .../endpoints/test_task_instance_endpoint.py       |  29 +-----
 .../schemas/test_task_instance_schema.py           |  64 +-----------
 tests/jobs/test_scheduler_job.py                   |   3 -
 tests/listeners/test_dag_import_error_listener.py  |   2 -
 tests/notifications/test_basenotifier.py           |   1 -
 tests_common/test_utils/db.py                      |   7 --
 20 files changed, 34 insertions(+), 450 deletions(-)

diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 23e5f5a6d16..0e98173f68a 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Iterable, Sequence, 
TypeVar
 
 from flask import g
 from marshmallow import ValidationError
-from sqlalchemy import and_, or_, select
+from sqlalchemy import or_, select
 from sqlalchemy.exc import MultipleResultsFound
 from sqlalchemy.orm import joinedload
 
@@ -48,7 +48,6 @@ from airflow.api_connexion.schemas.task_instance_schema 
import (
 from airflow.api_connexion.security import get_readable_dags
 from airflow.auth.managers.models.resource_details import DagAccessEntity, 
DagDetails
 from airflow.exceptions import TaskNotFound
-from airflow.models import SlaMiss
 from airflow.models.dagrun import DagRun as DR
 from airflow.models.taskinstance import TaskInstance as TI, 
clear_task_instances
 from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
@@ -84,27 +83,18 @@ def get_task_instance(
         select(TI)
         .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == 
task_id)
         .join(TI.dag_run)
-        .outerjoin(
-            SlaMiss,
-            and_(
-                SlaMiss.dag_id == TI.dag_id,
-                SlaMiss.execution_date == DR.execution_date,
-                SlaMiss.task_id == TI.task_id,
-            ),
-        )
-        .add_columns(SlaMiss)
         .options(joinedload(TI.rendered_task_instance_fields))
     )
 
     try:
-        task_instance = session.execute(query).one_or_none()
+        task_instance = session.scalar(query)
     except MultipleResultsFound:
         raise NotFound(
             "Task instance not found", detail="Task instance is mapped, add 
the map_index value to the URL"
         )
     if task_instance is None:
         raise NotFound("Task instance not found")
-    if task_instance[0].map_index != -1:
+    if task_instance.map_index != -1:
         raise NotFound(
             "Task instance not found", detail="Task instance is mapped, add 
the map_index value to the URL"
         )
@@ -127,18 +117,9 @@ def get_mapped_task_instance(
         select(TI)
         .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == 
task_id, TI.map_index == map_index)
         .join(TI.dag_run)
-        .outerjoin(
-            SlaMiss,
-            and_(
-                SlaMiss.dag_id == TI.dag_id,
-                SlaMiss.execution_date == DR.execution_date,
-                SlaMiss.task_id == TI.task_id,
-            ),
-        )
-        .add_columns(SlaMiss)
         .options(joinedload(TI.rendered_task_instance_fields))
     )
-    task_instance = session.execute(query).one_or_none()
+    task_instance = session.scalar(query)
 
     if task_instance is None:
         raise NotFound("Task instance not found")
@@ -232,28 +213,13 @@ def get_mapped_task_instances(
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
 
-    # Add SLA miss
-    entry_query = (
-        base_query.outerjoin(
-            SlaMiss,
-            and_(
-                SlaMiss.dag_id == TI.dag_id,
-                SlaMiss.task_id == TI.task_id,
-                SlaMiss.execution_date == DR.execution_date,
-            ),
-        )
-        .add_columns(SlaMiss)
-        .options(joinedload(TI.rendered_task_instance_fields))
-    )
-
     try:
         order_by_params = _get_order_by_params(order_by)
-        entry_query = entry_query.order_by(*order_by_params)
+        entry_query = base_query.order_by(*order_by_params)
     except _UnsupportedOrderBy as e:
         raise BadRequest(detail=f"Ordering with {e.order_by!r} is not 
supported")
 
-    # using execute because we want the SlaMiss entity. Scalars don't return 
None for missing entities
-    task_instances = 
session.execute(entry_query.offset(offset).limit(limit)).all()
+    task_instances = session.scalars(entry_query.offset(offset).limit(limit))
     return task_instance_collection_schema.dump(
         TaskInstanceCollection(task_instances=task_instances, 
total_entries=total_entries)
     )
@@ -384,28 +350,13 @@ def get_task_instances(
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
 
-    # Add join
-    entry_query = (
-        base_query.outerjoin(
-            SlaMiss,
-            and_(
-                SlaMiss.dag_id == TI.dag_id,
-                SlaMiss.task_id == TI.task_id,
-                SlaMiss.execution_date == DR.execution_date,
-            ),
-        )
-        .add_columns(SlaMiss)
-        .options(joinedload(TI.rendered_task_instance_fields))
-    )
-
     try:
         order_by_params = _get_order_by_params(order_by)
-        entry_query = entry_query.order_by(*order_by_params)
+        entry_query = base_query.order_by(*order_by_params)
     except _UnsupportedOrderBy as e:
         raise BadRequest(detail=f"Ordering with {e.order_by!r} is not 
supported")
 
-    # using execute because we want the SlaMiss entity. Scalars don't return 
None for missing entities
-    task_instances = 
session.execute(entry_query.offset(offset).limit(limit)).all()
+    task_instances = session.scalars(entry_query.offset(offset).limit(limit))
     return task_instance_collection_schema.dump(
         TaskInstanceCollection(task_instances=task_instances, 
total_entries=total_entries)
     )
@@ -463,16 +414,7 @@ def get_task_instances_batch(session: Session = 
NEW_SESSION) -> APIResponse:
 
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
-    # Add join
-    base_query = base_query.join(
-        SlaMiss,
-        and_(
-            SlaMiss.dag_id == TI.dag_id,
-            SlaMiss.task_id == TI.task_id,
-            SlaMiss.execution_date == DR.execution_date,
-        ),
-        isouter=True,
-    ).add_columns(SlaMiss)
+
     ti_query = base_query.options(
         joinedload(TI.rendered_task_instance_fields), 
joinedload(TI.task_instance_note)
     )
@@ -483,8 +425,7 @@ def get_task_instances_batch(session: Session = 
NEW_SESSION) -> APIResponse:
     except _UnsupportedOrderBy as e:
         raise BadRequest(detail=f"Ordering with {e.order_by!r} is not 
supported")
 
-    # using execute because we want the SlaMiss entity. Scalars don't return 
None for missing entities
-    task_instances = session.execute(ti_query).all()
+    task_instances = session.scalars(ti_query)
 
     return task_instance_collection_schema.dump(
         TaskInstanceCollection(task_instances=task_instances, 
total_entries=total_entries)
@@ -690,15 +631,6 @@ def set_task_instance_note(
         select(TI)
         .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == 
task_id)
         .join(TI.dag_run)
-        .outerjoin(
-            SlaMiss,
-            and_(
-                SlaMiss.dag_id == TI.dag_id,
-                SlaMiss.execution_date == DR.execution_date,
-                SlaMiss.task_id == TI.task_id,
-            ),
-        )
-        .add_columns(SlaMiss)
         .options(joinedload(TI.rendered_task_instance_fields))
     )
     if map_index == -1:
@@ -707,17 +639,15 @@ def set_task_instance_note(
         query = query.where(TI.map_index == map_index)
 
     try:
-        result = session.execute(query).one_or_none()
+        ti = session.scalar(query)
     except MultipleResultsFound:
         raise NotFound(
             "Task instance not found", detail="Task instance is mapped, add 
the map_index value to the URL"
         )
-    if result is None:
+    if ti is None:
         error_message = f"Task Instance not found for dag_id={dag_id}, 
run_id={dag_run_id}, task_id={task_id}"
         raise NotFound(error_message)
 
-    ti, sla_miss = result
-
     current_user_id = get_auth_manager().get_user_id()
     if ti.task_instance_note is None:
         ti.note = (new_note, current_user_id)
@@ -725,7 +655,7 @@ def set_task_instance_note(
         ti.task_instance_note.content = new_note
         ti.task_instance_note.user_id = current_user_id
     session.commit()
-    return task_instance_schema.dump((ti, sla_miss))
+    return task_instance_schema.dump(ti)
 
 
 @security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
diff --git a/airflow/api_connexion/schemas/sla_miss_schema.py 
b/airflow/api_connexion/schemas/sla_miss_schema.py
deleted file mode 100644
index 97a462e186d..00000000000
--- a/airflow/api_connexion/schemas/sla_miss_schema.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
-
-from airflow.models import SlaMiss
-
-
-class SlaMissSchema(SQLAlchemySchema):
-    """Sla Miss Schema."""
-
-    class Meta:
-        """Meta."""
-
-        model = SlaMiss
-
-    task_id = auto_field(dump_only=True)
-    dag_id = auto_field(dump_only=True)
-    execution_date = auto_field(dump_only=True)
-    email_sent = auto_field(dump_only=True)
-    timestamp = auto_field(dump_only=True)
-    description = auto_field(dump_only=True)
-    notification_sent = auto_field(dump_only=True)
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py 
b/airflow/api_connexion/schemas/task_instance_schema.py
index 0c1daf6ce2c..f0b8285fdfa 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, NamedTuple
+from typing import NamedTuple
 
 from marshmallow import Schema, ValidationError, fields, validate, 
validates_schema
 from marshmallow.utils import get_value
@@ -26,16 +26,12 @@ from airflow.api_connexion.parameters import 
validate_istimezone
 from airflow.api_connexion.schemas.common_schema import JsonObjectField
 from airflow.api_connexion.schemas.enum_schemas import TaskInstanceStateField
 from airflow.api_connexion.schemas.job_schema import JobSchema
-from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema
 from airflow.api_connexion.schemas.trigger_schema import TriggerSchema
 from airflow.models import TaskInstance
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.utils.helpers import exactly_one
 from airflow.utils.state import TaskInstanceState
 
-if TYPE_CHECKING:
-    from airflow.models import SlaMiss
-
 
 class TaskInstanceSchema(SQLAlchemySchema):
     """Task instance schema."""
@@ -69,22 +65,15 @@ class TaskInstanceSchema(SQLAlchemySchema):
     executor = auto_field()
     executor_config = auto_field()
     note = auto_field()
-    sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
     rendered_map_index = auto_field()
     rendered_fields = JsonObjectField(dump_default={})
     trigger = fields.Nested(TriggerSchema)
     triggerer_job = fields.Nested(JobSchema)
 
     def get_attribute(self, obj, attr, default):
-        if attr == "sla_miss":
-            # Object is a tuple of task_instance and slamiss
-            # and the get_value expects a dict with key, value
-            # corresponding to the attr.
-            slamiss_instance = {"sla_miss": obj[1]}
-            return get_value(slamiss_instance, attr, default)
-        elif attr == "rendered_fields":
-            return get_value(obj[0], 
"rendered_task_instance_fields.rendered_fields", default)
-        return get_value(obj[0], attr, default)
+        if attr == "rendered_fields":
+            return get_value(obj, 
"rendered_task_instance_fields.rendered_fields", default)
+        return get_value(obj, attr, default)
 
 
 class TaskInstanceHistorySchema(SQLAlchemySchema):
@@ -122,7 +111,7 @@ class TaskInstanceHistorySchema(SQLAlchemySchema):
 class TaskInstanceCollection(NamedTuple):
     """List of task instances with metadata."""
 
-    task_instances: list[tuple[TaskInstance, SlaMiss | None]]
+    task_instances: list[TaskInstance | None]
     total_entries: int
 
 
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 6e27bbcd2e5..4e55ab7ff15 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -55,7 +55,6 @@ with DAG(
         # 'priority_weight': 10,
         # 'end_date': datetime(2016, 1, 1),
         # 'wait_for_downstream': False,
-        # 'sla': timedelta(hours=2),
         # 'execution_timeout': timedelta(seconds=300),
         # 'on_failure_callback': some_function, # or list of functions
         # 'on_success_callback': some_other_function, # or list of functions
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 375761bc20f..1a998d60c5c 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -41,7 +41,6 @@ __all__ = [
     "Pool",
     "RenderedTaskInstanceFields",
     "SkipMixin",
-    "SlaMiss",
     "TaskFail",
     "TaskInstance",
     "TaskReschedule",
@@ -104,7 +103,6 @@ __lazy_imports = {
     "Pool": "airflow.models.pool",
     "RenderedTaskInstanceFields": "airflow.models.renderedtifields",
     "SkipMixin": "airflow.models.skipmixin",
-    "SlaMiss": "airflow.models.slamiss",
     "TaskFail": "airflow.models.taskfail",
     "TaskInstance": "airflow.models.taskinstance",
     "TaskReschedule": "airflow.models.taskreschedule",
@@ -134,7 +132,6 @@ if TYPE_CHECKING:
     from airflow.models.pool import Pool
     from airflow.models.renderedtifields import RenderedTaskInstanceFields
     from airflow.models.skipmixin import SkipMixin
-    from airflow.models.slamiss import SlaMiss
     from airflow.models.taskfail import TaskFail
     from airflow.models.taskinstance import TaskInstance, clear_task_instances
     from airflow.models.taskinstancehistory import TaskInstanceHistory
diff --git a/airflow/models/slamiss.py b/airflow/models/slamiss.py
deleted file mode 100644
index 4fb7e53a17b..00000000000
--- a/airflow/models/slamiss.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from sqlalchemy import Boolean, Column, Index, String, Text
-
-from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
-from airflow.utils.sqlalchemy import UtcDateTime
-
-
-class SlaMiss(Base):
-    """
-    Model that stores a history of the SLA that have been missed.
-
-    It is used to keep track of SLA failures over time and to avoid double 
triggering alert emails.
-    """
-
-    __tablename__ = "sla_miss"
-
-    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
-    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
-    execution_date = Column(UtcDateTime, primary_key=True)
-    email_sent = Column(Boolean, default=False)
-    timestamp = Column(UtcDateTime)
-    description = Column(Text)
-    notification_sent = Column(Boolean, default=False)
-
-    __table_args__ = (Index("sm_dag", dag_id, unique=False),)
-
-    def __repr__(self):
-        return str((self.dag_id, self.task_id, 
self.execution_date.isoformat()))
diff --git a/airflow/notifications/basenotifier.py 
b/airflow/notifications/basenotifier.py
index 91fef87167c..11d7ee17fe9 100644
--- a/airflow/notifications/basenotifier.py
+++ b/airflow/notifications/basenotifier.py
@@ -87,10 +87,6 @@ class BaseNotifier(Templater):
 
         :param context: The airflow context
         """
-        # Currently, there are two ways a callback is invoked
-        # 1. callback(context) - for on_*_callbacks
-        # 2. callback(dag, task_list, blocking_task_list, slas, blocking_tis) 
- for sla_miss_callback
-        # we have to distinguish between the two calls so that we can prepare 
the correct context,
         if len(args) == 1:
             context = args[0]
         else:
@@ -98,9 +94,9 @@ class BaseNotifier(Templater):
                 "dag": args[0],
                 "task_list": args[1],
                 "blocking_task_list": args[2],
-                "slas": args[3],
-                "blocking_tis": args[4],
+                "blocking_tis": args[3],
             }
+
         self._update_context(context)
         self.render_template_fields(context)
         try:
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index e5d30b1e2d7..4f308a6f9e0 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -418,6 +418,9 @@ def context_merge(context: Context, *args: Any, **kwargs: 
Any) -> None:
 
     :meta private:
     """
+    if not context:
+        context = Context()
+
     context.update(*args, **kwargs)
 
 
diff --git a/airflow/www/auth.py b/airflow/www/auth.py
index 74f31d135c1..d4b8ad619e6 100644
--- a/airflow/www/auth.py
+++ b/airflow/www/auth.py
@@ -49,7 +49,7 @@ if TYPE_CHECKING:
         IsAuthorizedPoolRequest,
         IsAuthorizedVariableRequest,
     )
-    from airflow.models import DagRun, Pool, SlaMiss, TaskInstance, Variable
+    from airflow.models import DagRun, Pool, TaskInstance, Variable
     from airflow.models.connection import Connection
     from airflow.models.xcom import BaseXCom
 
@@ -239,7 +239,7 @@ def has_access_dag_entities(method: ResourceMethod, 
access_entity: DagAccessEnti
     def has_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
-            items: set[SlaMiss | BaseXCom | DagRun | TaskInstance] = 
set(args[1])
+            items: set[BaseXCom | DagRun | TaskInstance] = set(args[1])
             requests: Sequence[IsAuthorizedDagRequest] = [
                 {
                     "method": method,
diff --git a/airflow/www/extensions/init_views.py 
b/airflow/www/extensions/init_views.py
index a04116a7c7d..1b0eddf5542 100644
--- a/airflow/www/extensions/init_views.py
+++ b/airflow/www/extensions/init_views.py
@@ -103,9 +103,6 @@ def init_appbuilder_views(app):
     appbuilder.add_view(
         views.ConnectionModelView, permissions.RESOURCE_CONNECTION, 
category=permissions.RESOURCE_ADMIN_MENU
     )
-    appbuilder.add_view(
-        views.SlaMissModelView, permissions.RESOURCE_SLA_MISS, 
category=permissions.RESOURCE_BROWSE_MENU
-    )
     appbuilder.add_view(
         views.PluginView, permissions.RESOURCE_PLUGIN, 
category=permissions.RESOURCE_ADMIN_MENU
     )
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 23ccae62249..1751bb33de3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -102,7 +102,7 @@ from airflow.hooks.base import BaseHook
 from airflow.jobs.job import Job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, 
Trigger, XCom
+from airflow.models import Connection, DagModel, DagTag, Log, Trigger, XCom
 from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel, 
DagScheduleAssetReference
 from airflow.models.dag import get_asset_triggered_next_run_info
 from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType
@@ -3876,119 +3876,6 @@ class AirflowModelView(ModelView):
         return action.func(items)
 
 
-class SlaMissModelView(AirflowModelView):
-    """View to show SlaMiss table."""
-
-    route_base = "/slamiss"
-
-    datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss)  # type: ignore
-
-    class_permission_name = permissions.RESOURCE_SLA_MISS
-    method_permission_name = {
-        "list": "read",
-        "action_muldelete": "delete",
-        "action_mulnotificationsent": "edit",
-        "action_mulnotificationsentfalse": "edit",
-        "action_mulemailsent": "edit",
-        "action_mulemailsentfalse": "edit",
-    }
-
-    base_permissions = [
-        permissions.ACTION_CAN_READ,
-        permissions.ACTION_CAN_ACCESS_MENU,
-    ]
-
-    list_columns = ["dag_id", "task_id", "execution_date", "email_sent", 
"notification_sent", "timestamp"]
-
-    label_columns = {
-        "execution_date": "Logical Date",
-    }
-
-    add_columns = ["dag_id", "task_id", "execution_date", "email_sent", 
"notification_sent", "timestamp"]
-    edit_columns = ["dag_id", "task_id", "execution_date", "email_sent", 
"notification_sent", "timestamp"]
-    search_columns = ["dag_id", "task_id", "email_sent", "notification_sent", 
"timestamp", "execution_date"]
-    base_order = ("execution_date", "desc")
-    base_filters = [["dag_id", DagFilter, list]]
-
-    formatters_columns = {
-        "task_id": wwwutils.task_instance_link,
-        "execution_date": wwwutils.datetime_f("execution_date"),
-        "timestamp": wwwutils.datetime_f("timestamp"),
-        "dag_id": wwwutils.dag_link,
-        "map_index": wwwutils.format_map_index,
-    }
-
-    @action("muldelete", "Delete", "Are you sure you want to delete selected 
records?", single=False)
-    @auth.has_access_dag_entities("DELETE", DagAccessEntity.SLA_MISS)
-    def action_muldelete(self, items):
-        """Multiple delete action."""
-        self.datamodel.delete_all(items)
-        self.update_redirect()
-        return redirect(self.get_redirect())
-
-    @action(
-        "mulnotificationsent",
-        "Set notification sent to true",
-        "Are you sure you want to set all these notifications to sent?",
-        single=False,
-    )
-    @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
-    def action_mulnotificationsent(self, items: list[SlaMiss]):
-        return self._set_notification_property(items, "notification_sent", 
True)
-
-    @action(
-        "mulnotificationsentfalse",
-        "Set notification sent to false",
-        "Are you sure you want to mark these SLA alerts as notification not 
sent yet?",
-        single=False,
-    )
-    @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
-    def action_mulnotificationsentfalse(self, items: list[SlaMiss]):
-        return self._set_notification_property(items, "notification_sent", 
False)
-
-    @action(
-        "mulemailsent",
-        "Set email sent to true",
-        "Are you sure you want to mark these SLA alerts as emails were sent?",
-        single=False,
-    )
-    @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
-    def action_mulemailsent(self, items: list[SlaMiss]):
-        return self._set_notification_property(items, "email_sent", True)
-
-    @action(
-        "mulemailsentfalse",
-        "Set email sent to false",
-        "Are you sure you want to mark these SLA alerts as emails not sent 
yet?",
-        single=False,
-    )
-    @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
-    def action_mulemailsentfalse(self, items: list[SlaMiss]):
-        return self._set_notification_property(items, "email_sent", False)
-
-    @provide_session
-    def _set_notification_property(
-        self,
-        items: list[SlaMiss],
-        attr: str,
-        new_value: bool,
-        session: Session = NEW_SESSION,
-    ):
-        try:
-            count = 0
-            for sla in items:
-                count += 1
-                setattr(sla, attr, new_value)
-                session.merge(sla)
-            session.commit()
-            flash(f"{count} SLAMisses had {attr} set to {new_value}.")
-        except Exception as ex:
-            flash(str(ex), "error")
-            flash("Failed to set state", "error")
-        self.update_redirect()
-        return redirect(self.get_default_url())
-
-
 class XComModelView(AirflowModelView):
     """View to show records from XCom table."""
 
diff --git 
a/providers/tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py 
b/providers/tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
index 6aea6b86f7b..f0063212c69 100644
--- 
a/providers/tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
+++ 
b/providers/tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py
@@ -21,7 +21,7 @@ import urllib
 
 import pytest
 from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
-from tests_common.test_utils.db import clear_db_runs, clear_db_sla_miss, 
clear_rendered_ti_fields
+from tests_common.test_utils.db import clear_db_runs, clear_rendered_ti_fields
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
 from airflow.models import DagRun, TaskInstance
@@ -124,7 +124,6 @@ class TestTaskInstanceEndpoint:
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
         clear_db_runs()
-        clear_db_sla_miss()
         clear_rendered_ti_fields()
         self.dagbag = dagbag
 
diff --git a/providers/tests/smtp/notifications/test_smtp.py 
b/providers/tests/smtp/notifications/test_smtp.py
index 594fa19dfee..95b1618aa4f 100644
--- a/providers/tests/smtp/notifications/test_smtp.py
+++ b/providers/tests/smtp/notifications/test_smtp.py
@@ -25,7 +25,6 @@ from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS
 from tests_common.test_utils.config import conf_vars
 
 from airflow.configuration import conf
-from airflow.models import SlaMiss
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.smtp.hooks.smtp import SmtpHook
 from airflow.providers.smtp.notifications.smtp import (
@@ -145,38 +144,6 @@ class TestSmtpNotifier:
         content = 
mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
         assert f"{NUM_TRY} of 1" in content
 
-    @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
-    def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
-        with dag_maker("test_notifier") as dag:
-            EmptyOperator(task_id="task1")
-        context = {
-            "dag": dag,
-            "slas": [SlaMiss(task_id="op", dag_id=dag.dag_id, 
execution_date=timezone.datetime(2018, 1, 1))],
-            "task_list": [],
-            "blocking_task_list": [],
-            "blocking_tis": [],
-        }
-        notifier = SmtpNotifier(
-            from_email=conf.get("smtp", "smtp_mail_from"),
-            to="test_reci...@test.com",
-        )
-        notifier(context)
-        
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
-            from_email=conf.get("smtp", "smtp_mail_from"),
-            to="test_reci...@test.com",
-            subject="SLA Missed for DAG test_notifier - Task op",
-            html_content=mock.ANY,
-            smtp_conn_id="smtp_default",
-            files=None,
-            cc=None,
-            bcc=None,
-            mime_subtype="mixed",
-            mime_charset="utf-8",
-            custom_headers=None,
-        )
-        content = 
mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"]
-        assert "Task List:" in content
-
     @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
     def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, 
create_task_instance):
         ti = create_task_instance(dag_id="dag", task_id="op", 
execution_date=timezone.datetime(2018, 1, 1))
diff --git 
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 843de41f1b4..80fdd72726f 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -33,7 +33,7 @@ from airflow.utils.session import provide_session
 from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.timezone import datetime
 from tests_common.test_utils.api_connexion_utils import assert_401, 
create_user, delete_user
-from tests_common.test_utils.db import clear_db_runs, clear_db_sla_miss, 
clear_rendered_ti_fields
+from tests_common.test_utils.db import clear_db_runs, clear_rendered_ti_fields
 from tests_common.test_utils.mock_operators import MockOperator
 
 pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
@@ -81,7 +81,6 @@ class TestMappedTaskInstanceEndpoint:
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
         clear_db_runs()
-        clear_db_sla_miss()
         clear_rendered_ti_fields()
 
     def create_dag_runs_with_mapped_tasks(self, dag_maker, session, dags=None):
@@ -227,7 +226,6 @@ class 
TestGetMappedTaskInstance(TestMappedTaskInstanceEndpoint):
             "queued_when": None,
             "rendered_fields": {},
             "rendered_map_index": None,
-            "sla_miss": None,
             "start_date": "2020-01-01T00:00:00+00:00",
             "state": "success",
             "task_id": "task_2",
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index bec52292fbe..57467402055 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -27,7 +27,7 @@ from sqlalchemy.orm import contains_eager
 
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models import DagRun, SlaMiss, TaskInstance, Trigger
+from airflow.models import DagRun, TaskInstance, Trigger
 from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.utils.platform import getuser
@@ -36,7 +36,7 @@ from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 from tests_common.test_utils.api_connexion_utils import assert_401, 
create_user, delete_user
-from tests_common.test_utils.db import clear_db_runs, clear_db_sla_miss, 
clear_rendered_ti_fields
+from tests_common.test_utils.db import clear_db_runs, clear_rendered_ti_fields
 from tests_common.test_utils.www import _check_last_log
 
 pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
@@ -85,7 +85,6 @@ class TestTaskInstanceEndpoint:
         self.app = configured_app
         self.client = self.app.test_client()  # type:ignore
         clear_db_runs()
-        clear_db_sla_miss()
         clear_rendered_ti_fields()
         self.dagbag = dagbag
 
@@ -197,7 +196,6 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": None,
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "running",
             "task_id": "print_the_context",
@@ -256,7 +254,6 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": None,
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "deferred",
             "task_id": "print_the_context",
@@ -304,7 +301,6 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": None,
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "removed",
             "task_id": "print_the_context",
@@ -318,16 +314,9 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "triggerer_job": None,
         }
 
-    def test_should_respond_200_task_instance_with_sla_and_rendered(self, 
session):
+    def test_should_respond_200_task_instance_with_rendered(self, session):
         tis = self.create_task_instances(session)
         session.query()
-        sla_miss = SlaMiss(
-            task_id="print_the_context",
-            dag_id="example_python_operator",
-            execution_date=self.default_time,
-            timestamp=self.default_time,
-        )
-        session.add(sla_miss)
         rendered_fields = RTIF(tis[0], render_templates=False)
         session.add(rendered_fields)
         session.commit()
@@ -355,15 +344,6 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": {
-                "dag_id": "example_python_operator",
-                "description": None,
-                "email_sent": False,
-                "execution_date": "2020-01-01T00:00:00+00:00",
-                "notification_sent": False,
-                "task_id": "print_the_context",
-                "timestamp": "2020-01-01T00:00:00+00:00",
-            },
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "running",
             "task_id": "print_the_context",
@@ -416,7 +396,6 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
                 "priority_weight": 9,
                 "queue": "default_queue",
                 "queued_when": None,
-                "sla_miss": None,
                 "start_date": "2020-01-02T00:00:00+00:00",
                 "state": "running",
                 "task_id": "print_the_context",
@@ -2320,7 +2299,6 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
             "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": None,
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "running",
             "task_id": "print_the_context",
@@ -2380,7 +2358,6 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
                 "priority_weight": 9,
                 "queue": "default_queue",
                 "queued_when": None,
-                "sla_miss": None,
                 "start_date": "2020-01-02T00:00:00+00:00",
                 "state": "running",
                 "task_id": "print_the_context",
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py 
b/tests/api_connexion/schemas/test_task_instance_schema.py
index 181e8351d2f..ff18ad75b54 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -26,7 +26,7 @@ from airflow.api_connexion.schemas.task_instance_schema 
import (
     set_task_instance_state_form,
     task_instance_schema,
 )
-from airflow.models import RenderedTaskInstanceFields as RTIF, SlaMiss, 
TaskInstance as TI
+from airflow.models import TaskInstance as TI
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.platform import getuser
 from airflow.utils.state import State
@@ -64,12 +64,12 @@ class TestTaskInstanceSchema:
 
         session.rollback()
 
-    def test_task_instance_schema_without_sla_and_rendered(self, session):
+    def test_task_instance_schema_without_rendered(self, session):
         ti = TI(task=self.task, **self.default_ti_init)
         session.add(ti)
         for key, value in self.default_ti_extras.items():
             setattr(ti, key, value)
-        serialized_ti = task_instance_schema.dump((ti, None, None))
+        serialized_ti = task_instance_schema.dump(ti)
         expected_json = {
             "dag_id": "TEST_DAG_ID",
             "duration": 10000.0,
@@ -88,7 +88,6 @@ class TestTaskInstanceSchema:
             "priority_weight": 1,
             "queue": "default_queue",
             "queued_when": None,
-            "sla_miss": None,
             "start_date": "2020-01-02T00:00:00+00:00",
             "state": "running",
             "task_id": "TEST_TASK_ID",
@@ -103,63 +102,6 @@ class TestTaskInstanceSchema:
         }
         assert serialized_ti == expected_json
 
-    def test_task_instance_schema_with_sla_and_rendered(self, session):
-        sla_miss = SlaMiss(
-            task_id="TEST_TASK_ID",
-            dag_id="TEST_DAG_ID",
-            execution_date=self.default_time,
-        )
-        session.add(sla_miss)
-        session.flush()
-        ti = TI(task=self.task, **self.default_ti_init)
-        session.add(ti)
-        for key, value in self.default_ti_extras.items():
-            setattr(ti, key, value)
-        self.task.template_fields = ["partitions"]
-        setattr(self.task, "partitions", "data/ds=2022-02-17")
-        ti.rendered_task_instance_fields = RTIF(ti, render_templates=False)
-        serialized_ti = task_instance_schema.dump((ti, sla_miss))
-        expected_json = {
-            "dag_id": "TEST_DAG_ID",
-            "duration": 10000.0,
-            "end_date": "2020-01-03T00:00:00+00:00",
-            "execution_date": "2020-01-01T00:00:00+00:00",
-            "executor": None,
-            "executor_config": "{}",
-            "hostname": "",
-            "map_index": -1,
-            "max_tries": 0,
-            "note": "added some notes",
-            "operator": "EmptyOperator",
-            "pid": 100,
-            "pool": "default_pool",
-            "pool_slots": 1,
-            "priority_weight": 1,
-            "queue": "default_queue",
-            "queued_when": None,
-            "sla_miss": {
-                "dag_id": "TEST_DAG_ID",
-                "description": None,
-                "email_sent": False,
-                "execution_date": "2020-01-01T00:00:00+00:00",
-                "notification_sent": False,
-                "task_id": "TEST_TASK_ID",
-                "timestamp": None,
-            },
-            "start_date": "2020-01-02T00:00:00+00:00",
-            "state": "running",
-            "task_id": "TEST_TASK_ID",
-            "task_display_name": "TEST_TASK_ID",
-            "try_number": 0,
-            "unixname": getuser(),
-            "dag_run_id": None,
-            "rendered_fields": {"partitions": "data/ds=2022-02-17"},
-            "rendered_map_index": None,
-            "trigger": None,
-            "triggerer_job": None,
-        }
-        assert serialized_ti == expected_json
-
 
 class TestClearTaskInstanceFormSchema:
     @pytest.mark.parametrize(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5244e964909..f35ed29e7b1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -81,7 +81,6 @@ from tests_common.test_utils.db import (
     clear_db_pools,
     clear_db_runs,
     clear_db_serialized_dags,
-    clear_db_sla_miss,
     set_default_pool_slots,
 )
 from tests_common.test_utils.mock_executor import MockExecutor
@@ -143,7 +142,6 @@ class TestSchedulerJob:
         clear_db_backfills()
         clear_db_pools()
         clear_db_dags()
-        clear_db_sla_miss()
         clear_db_import_errors()
         clear_db_jobs()
         clear_db_assets()
@@ -6236,7 +6234,6 @@ class TestSchedulerJobQueriesCount:
         clear_db_pools()
         clear_db_backfills()
         clear_db_dags()
-        clear_db_sla_miss()
         clear_db_import_errors()
         clear_db_jobs()
         clear_db_serialized_dags()
diff --git a/tests/listeners/test_dag_import_error_listener.py 
b/tests/listeners/test_dag_import_error_listener.py
index 52a6103dc5b..e636de4218c 100644
--- a/tests/listeners/test_dag_import_error_listener.py
+++ b/tests/listeners/test_dag_import_error_listener.py
@@ -39,7 +39,6 @@ from tests_common.test_utils.db import (
     clear_db_pools,
     clear_db_runs,
     clear_db_serialized_dags,
-    clear_db_sla_miss,
 )
 from tests_common.test_utils.mock_executor import MockExecutor
 
@@ -76,7 +75,6 @@ class TestDagFileProcessor:
         clear_db_runs()
         clear_db_pools()
         clear_db_dags()
-        clear_db_sla_miss()
         clear_db_import_errors()
         clear_db_jobs()
         clear_db_serialized_dags()
diff --git a/tests/notifications/test_basenotifier.py 
b/tests/notifications/test_basenotifier.py
index 65bda6ade8a..b2e3d751a6f 100644
--- a/tests/notifications/test_basenotifier.py
+++ b/tests/notifications/test_basenotifier.py
@@ -94,7 +94,6 @@ class TestBaseNotifier:
                 "dag": None,
                 "task_list": ["some_task"],
                 "blocking_task_list": None,
-                "slas": None,
                 "blocking_tis": None,
                 "message": "task: {{ task_list[0] }}",
             }
diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py
index bf92c3a2582..c9df1f39700 100644
--- a/tests_common/test_utils/db.py
+++ b/tests_common/test_utils/db.py
@@ -29,7 +29,6 @@ from airflow.models import (
     Log,
     Pool,
     RenderedTaskInstanceFields,
-    SlaMiss,
     TaskFail,
     TaskInstance,
     TaskReschedule,
@@ -129,11 +128,6 @@ def clear_db_serialized_dags():
         session.query(SerializedDagModel).delete()
 
 
-def clear_db_sla_miss():
-    with create_session() as session:
-        session.query(SlaMiss).delete()
-
-
 def clear_db_pools():
     with create_session() as session:
         session.query(Pool).delete()
@@ -256,7 +250,6 @@ def clear_all():
     clear_db_assets()
     clear_db_dags()
     clear_db_serialized_dags()
-    clear_db_sla_miss()
     clear_db_dag_code()
     clear_db_callbacks()
     clear_rendered_ti_fields()


Reply via email to