This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4556828  AIP-39: DagRun.data_interval_start|end (#16352)
4556828 is described below

commit 4556828dccba2df0ab26b0aeeb26d0a30944da6f
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Aug 12 22:16:57 2021 +0800

    AIP-39: DagRun.data_interval_start|end (#16352)
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 airflow/api/common/experimental/mark_tasks.py      |   4 +-
 .../api_connexion/endpoints/dag_run_endpoint.py    |  19 +-
 airflow/jobs/backfill_job.py                       |  47 +--
 airflow/jobs/scheduler_job.py                      |   5 +-
 ...ta_interval_start_end_to_dagmodel_and_dagrun.py |  77 +++++
 airflow/models/baseoperator.py                     |   7 +-
 airflow/models/dag.py                              | 168 ++++++++---
 airflow/models/dagrun.py                           |  12 +-
 airflow/models/taskinstance.py                     | 318 ++++++++++++++-------
 airflow/operators/python.py                        |   7 +-
 airflow/operators/subdag.py                        |   8 +-
 airflow/timetables/base.py                         |  56 ++--
 airflow/timetables/interval.py                     |   9 +-
 airflow/timetables/simple.py                       |   8 +-
 airflow/utils/file.py                              |   2 +
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 docs/apache-airflow/templates-ref.rst              | 108 ++++---
 .../endpoints/test_dag_run_endpoint.py             |   5 +-
 tests/cli/commands/test_dag_command.py             |  28 +-
 tests/core/test_core.py                            |  45 +--
 tests/decorators/test_python.py                    |   2 +
 tests/jobs/test_backfill_job.py                    |  30 +-
 tests/jobs/test_scheduler_job.py                   |  31 +-
 tests/models/test_dag.py                           | 110 ++++---
 tests/models/test_taskinstance.py                  |  15 +-
 tests/operators/test_python.py                     |   2 +
 tests/operators/test_subdag_operator.py            |  50 +++-
 tests/sensors/test_base.py                         |   5 +-
 tests/sensors/test_python.py                       |   2 +
 .../perf/scheduler_dag_execution_timing.py         |   2 +-
 tests/timetables/test_time_table_iter_ranges.py    |  38 ---
 tests/www/views/test_views_trigger_dag.py          |   9 +-
 32 files changed, 848 insertions(+), 385 deletions(-)

diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index 9a99d4d..30ecd42 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -257,7 +257,9 @@ def get_execution_dates(dag, execution_date, future, past):
         dag_runs = dag.get_dagruns_between(start_date=start_date, 
end_date=end_date)
         dates = sorted({d.execution_date for d in dag_runs})
     else:
-        dates = dag.get_run_dates(start_date, end_date, align=False)
+        dates = [
+            info.logical_date for info in 
dag.iter_dagrun_infos_between(start_date, end_date, align=False)
+        ]
     return dates
 
 
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 27d13b6..80d04ab 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -31,6 +31,7 @@ from airflow.api_connexion.schemas.dag_run_schema import (
 from airflow.models import DagModel, DagRun
 from airflow.security import permissions
 from airflow.utils.session import provide_session
+from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 
 
@@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
+    run_id = post_body["run_id"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == 
post_body["execution_date"]),
+            or_(DagRun.run_id == run_id, DagRun.execution_date == 
execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, 
**post_body)
-        session.add(dag_run)
-        session.commit()
+        dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
+            run_type=DagRunType.MANUAL,
+            run_id=run_id,
+            execution_date=execution_date,
+            state=State.QUEUED,
+            conf=post_body.get("conf"),
+            external_trigger=True,
+            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
+        )
         return dagrun_schema.dump(dag_run)
 
-    if dagrun_instance.execution_date == post_body["execution_date"]:
+    if dagrun_instance.execution_date == execution_date:
         raise AlreadyExists(
             detail=f"DAGRun with DAG ID: '{dag_id}' and "
             f"DAGRun ExecutionDate: '{post_body['execution_date']}' already 
exists"
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 880bdaa..5a8c77b 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -19,9 +19,9 @@
 
 import time
 from collections import OrderedDict
-from datetime import datetime
 from typing import Optional, Set
 
+import pendulum
 from sqlalchemy import and_
 from sqlalchemy.orm.session import Session, make_transient
 from tabulate import tabulate
@@ -42,6 +42,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
+from airflow.timetables.base import DagRunInfo
 from airflow.utils import helpers, timezone
 from airflow.utils.configuration import tmp_configuration_copy
 from airflow.utils.session import provide_session
@@ -283,17 +284,19 @@ class BackfillJob(BaseJob):
                 ti.handle_failure_with_callback(error=msg)
 
     @provide_session
-    def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session = 
None):
+    def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session 
= None):
         """
         Returns a dag run for the given run date, which will be matched to an 
existing
         dag run if available or create a new dag run otherwise. If the 
max_active_runs
         limit is reached, this function will return None.
 
-        :param run_date: the execution date for the dag run
+        :param dagrun_info: Schedule information for the dag run
         :param dag: DAG
         :param session: the database session object
         :return: a DagRun in state RUNNING or None
         """
+        run_date = dagrun_info.logical_date
+
         # consider max_active_runs but ignore when running subdags
         respect_dag_max_active_limit = bool(dag.schedule_interval and not 
dag.is_subdag)
 
@@ -317,6 +320,7 @@ class BackfillJob(BaseJob):
 
         run = run or dag.create_dagrun(
             execution_date=run_date,
+            data_interval=dagrun_info.data_interval,
             start_date=timezone.utcnow(),
             state=State.RUNNING,
             external_trigger=False,
@@ -690,14 +694,14 @@ class BackfillJob(BaseJob):
         return err
 
     @provide_session
-    def _execute_for_run_dates(self, run_dates, ti_status, executor, 
pickle_id, start_date, session=None):
+    def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, 
start_date, session=None):
         """
         Computes the dag runs and their respective task instances for
         the given run dates and executes the task instances.
         Returns a list of execution dates of the dag runs that were executed.
 
-        :param run_dates: Execution dates for dag runs
-        :type run_dates: list
+        :param dagrun_infos: Schedule information for dag runs
+        :type dagrun_infos: list[DagRunInfo]
         :param ti_status: internal BackfillJob status structure to tis track 
progress
         :type ti_status: BackfillJob._DagRunTaskStatus
         :param executor: the executor to use, it must be previously started
@@ -709,9 +713,9 @@ class BackfillJob(BaseJob):
         :param session: the current session object
         :type session: sqlalchemy.orm.session.Session
         """
-        for next_run_date in run_dates:
+        for dagrun_info in dagrun_infos:
             for dag in [self.dag] + self.dag.subdags:
-                dag_run = self._get_dag_run(next_run_date, dag, 
session=session)
+                dag_run = self._get_dag_run(dagrun_info, dag, session=session)
                 tis_map = self._task_instances_for_dag_run(dag_run, 
session=session)
                 if dag_run is None:
                     continue
@@ -755,8 +759,13 @@ class BackfillJob(BaseJob):
 
         start_date = self.bf_start_date
 
-        # Get intervals between the start/end dates, which will turn into dag 
runs
-        run_dates = self.dag.get_run_dates(start_date=start_date, 
end_date=self.bf_end_date, align=True)
+        # Get DagRun schedule between the start/end dates, which will turn 
into dag runs.
+        dagrun_start_date = timezone.coerce_datetime(start_date)
+        if self.bf_end_date is None:
+            dagrun_end_date = pendulum.now(timezone.utc)
+        else:
+            dagrun_end_date = pendulum.instance(self.bf_end_date)
+        dagrun_infos = 
list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
         if self.run_backwards:
             tasks_that_depend_on_past = [t.task_id for t in 
self.dag.task_dict.values() if t.depends_on_past]
             if tasks_that_depend_on_past:
@@ -765,9 +774,10 @@ class BackfillJob(BaseJob):
                         ",".join(tasks_that_depend_on_past)
                     )
                 )
-            run_dates = run_dates[::-1]
+            dagrun_infos = dagrun_infos[::-1]
 
-        if len(run_dates) == 0:
+        dagrun_info_count = len(dagrun_infos)
+        if dagrun_info_count == 0:
             self.log.info("No run dates were found for the given dates and dag 
interval.")
             return
 
@@ -788,17 +798,18 @@ class BackfillJob(BaseJob):
         executor.job_id = "backfill"
         executor.start()
 
-        ti_status.total_runs = len(run_dates)  # total dag runs in backfill
+        ti_status.total_runs = dagrun_info_count  # total dag runs in backfill
 
         try:
             remaining_dates = ti_status.total_runs
             while remaining_dates > 0:
-                dates_to_process = [
-                    run_date for run_date in run_dates if run_date not in 
ti_status.executed_dag_run_dates
+                dagrun_infos_to_process = [
+                    dagrun_info
+                    for dagrun_info in dagrun_infos
+                    if dagrun_info.logical_date not in 
ti_status.executed_dag_run_dates
                 ]
-
-                self._execute_for_run_dates(
-                    run_dates=dates_to_process,
+                self._execute_dagruns(
+                    dagrun_infos=dagrun_infos_to_process,
                     ti_status=ti_status,
                     executor=executor,
                     pickle_id=pickle_id,
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index c254200..e0845c6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -953,14 +953,13 @@ class SchedulerJob(BaseJob):
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
                     state=State.QUEUED,
+                    data_interval=dag_model.next_dagrun_data_interval,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
-                dag_model.next_dagrun
-            )
+            dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun, 
0)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
diff --git 
a/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
 
b/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
new file mode 100644
index 0000000..369e6a3
--- /dev/null
+++ 
b/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagModel and DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+from alembic import op
+from sqlalchemy import TIMESTAMP, Column
+from sqlalchemy.dialects import mssql, mysql
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "54bebd308c5f"
+branch_labels = None
+depends_on = None
+
+
+def _use_date_time2(conn):
+    result = conn.execute(
+        """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY 
('productversion'))
+        like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+    ).fetchone()
+    mssql_version = result[0]
+    return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+    dialect_name = conn.dialect.name
+    if dialect_name == "mysql":
+        return mysql.TIMESTAMP(fsp=6, timezone=True)
+    if dialect_name != "mssql":
+        return TIMESTAMP(timezone=True)
+    if _use_date_time2(conn):
+        return mssql.DATETIME2(precision=6)
+    return mssql.DATETIME
+
+
+def upgrade():
+    """Apply data_interval fields to DagModel and DagRun."""
+    column_type = _get_timestamp(op.get_bind())
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.add_column(Column("data_interval_start", column_type))
+        batch_op.add_column(Column("data_interval_end", column_type))
+    with op.batch_alter_table("dag") as batch_op:
+        batch_op.add_column(Column("next_dagrun_data_interval_start", 
column_type))
+        batch_op.add_column(Column("next_dagrun_data_interval_end", 
column_type))
+
+
+def downgrade():
+    """Unapply data_interval fields to DagModel and DagRun."""
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.drop_column("data_interval_start")
+        batch_op.drop_column("data_interval_end")
+    with op.batch_alter_table("dag") as batch_op:
+        batch_op.drop_column("next_dagrun_data_interval_start")
+        batch_op.drop_column("next_dagrun_data_interval_end")
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index f83a3dc..e1cd6ae 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1259,10 +1259,11 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, 
metaclass=BaseOperatorMeta
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
-        for execution_date in self.dag.get_run_dates(start_date, end_date, 
align=False):
-            TaskInstance(self, execution_date).run(
+        for info in self.dag.iter_dagrun_infos_between(start_date, end_date, 
align=False):
+            ignore_depends_on_past = info.logical_date == start_date and 
ignore_first_depends_on_past
+            TaskInstance(self, info.logical_date).run(
                 mark_success=mark_success,
-                ignore_depends_on_past=(execution_date == start_date and 
ignore_first_depends_on_past),
+                ignore_depends_on_past=ignore_depends_on_past,
                 ignore_ti_state=ignore_ti_state,
             )
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 82c4d58..fdfec18 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -70,7 +70,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import Context, TaskInstance, 
TaskInstanceKey, clear_task_instances
 from airflow.security import permissions
 from airflow.stats import Stats
-from airflow.timetables.base import TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
 from airflow.timetables.interval import CronDataIntervalTimetable, 
DeltaDataIntervalTimetable
 from airflow.timetables.schedules import Schedule
 from airflow.timetables.simple import NullTimetable, OnceTimetable
@@ -479,8 +479,11 @@ class DAG(LoggingMixin):
         if num is not None:
             result = utils_date_range(start_date=start_date, num=num)
         else:
-            message += " Please use `DAG.get_run_dates(..., align=False)` 
instead."
-            result = self.get_run_dates(start_date, end_date, align=False)
+            message += " Please use `DAG.iter_dagrun_infos_between(..., 
align=False)` instead."
+            result = [
+                info.logical_date
+                for info in self.iter_dagrun_infos_between(start_date, 
end_date, align=False)
+            ]
         warnings.warn(message, category=DeprecationWarning, stacklevel=2)
         return result
 
@@ -524,7 +527,7 @@ class DAG(LoggingMixin):
     def next_dagrun_info(
         self,
         date_last_automated_dagrun: Optional[pendulum.DateTime],
-    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+    ) -> Optional[DagRunInfo]:
         """Get information about the next DagRun of this dag after 
``date_last_automated_dagrun``.
 
         This calculates what time interval the next DagRun should operate on
@@ -537,22 +540,20 @@ class DAG(LoggingMixin):
         :param date_last_automated_dagrun: The ``max(execution_date)`` of
             existing "automated" DagRuns for this dag (scheduled or backfill,
             but not manual).
-        :return: A 2-tuple containing the DagRun's execution date, and the
-            earliest it could be scheduled.
+        :return: DagRunInfo of the next dagrun, or None if a dagrun is not
+            going to be scheduled.
         """
-        # XXX: The timezone.coerce_datetime calls in this function should not
-        # be necessary since the function annotation suggests it only accepts
-        # pendulum.DateTime, and someone is passing datetime.datetime into this
-        # function. We should fix whatever is doing that.
+        # Never schedule a subdag. It will be scheduled by its parent dag.
         if self.is_subdag:
-            return (None, None)
-        next_info = self.timetable.next_dagrun_info(
+            return None
+        # XXX: The timezone.coerce_datetime calls should not be necessary since
+        # the function annotation suggests it only accepts pendulum.DateTime,
+        # and someone is passing datetime.datetime into this function. We 
should
+        # fix whatever is doing that.
+        return self.timetable.next_dagrun_info(
             timezone.coerce_datetime(date_last_automated_dagrun),
             self._time_restriction,
         )
-        if next_info is None:
-            return (None, None)
-        return (next_info.data_interval.start, next_info.run_after)
 
     def next_dagrun_after_date(self, date_last_automated_dagrun: 
Optional[pendulum.DateTime]):
         warnings.warn(
@@ -560,7 +561,10 @@ class DAG(LoggingMixin):
             category=DeprecationWarning,
             stacklevel=2,
         )
-        return self.next_dagrun_info(date_last_automated_dagrun)[0]
+        info = self.next_dagrun_info(date_last_automated_dagrun)
+        if info is None:
+            return None
+        return info.run_after
 
     @cached_property
     def _time_restriction(self) -> TimeRestriction:
@@ -594,7 +598,64 @@ class DAG(LoggingMixin):
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``logical_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``logical_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``earliest``, even if it does not fall on the logical timetable 
schedule.
+        The default is ``True``, but subdags will ignore this value and always
+        behave as if this is set to ``False`` for backward compatibility.
+
+        Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
+        ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
+        ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
+        if ``align=True``.
+        """
+        if earliest is None:
+            earliest = self._time_restriction.earliest
+        earliest = timezone.coerce_datetime(earliest)
+        latest = timezone.coerce_datetime(latest)
+
+        restriction = TimeRestriction(earliest, latest, catchup=True)
+
+        # HACK: Sub-DAGs are currently scheduled differently. For example, say
+        # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
+        # DAG should be first scheduled to run on midnight 2021-06-04, but a
+        # sub-DAG should be first scheduled to run RIGHT NOW. We can change
+        # this, but since sub-DAGs are going away in 3.0 anyway, let's keep
+        # compatibility for now and remove this entirely later.
+        if self.is_subdag:
+            align = False
+
+        info = self.timetable.next_dagrun_info(None, restriction)
+        if info is None:
+            # No runs to be scheduled between the user-supplied timeframe. But
+            # if align=False, "invent" a data interval for the timeframe 
itself.
+            if not align:
+                yield DagRunInfo.interval(earliest, latest)
+            return
+
+        # If align=False and earliest does not fall on the timetable's logical
+        # schedule, "invent" a data interval for it.
+        if not align and info.logical_date != earliest:
+            yield DagRunInfo.interval(earliest, info.data_interval.start)
+
+        # Generate naturally according to schedule.
+        while info is not None:
+            yield info
+            info = self.timetable.next_dagrun_info(info.logical_date, 
restriction)
+
+    def get_run_dates(self, start_date, end_date=None):
         """
         Returns a list of dates between the interval received as parameter 
using this
         dag's schedule interval. Returned dates can be used for execution 
dates.
@@ -603,31 +664,20 @@ class DAG(LoggingMixin):
         :type start_date: datetime
         :param end_date: The end date of the interval. Defaults to 
``timezone.utcnow()``.
         :type end_date: datetime
-        :param align: Whether the first run should be delayed to "align" with
-            the schedule, or can happen immediately at start_date. The default 
is
-            ``True``, but subdags will ignore this value and always behave as 
if
-            this is set to ``False`` for backward compatibility.
-        :type align: bool
         :return: A list of dates within the interval following the dag's 
schedule.
         :rtype: list
         """
-        if start_date is None:
-            start = self._time_restriction.earliest
-        else:
-            start = pendulum.instance(start_date)
+        warnings.warn(
+            "`DAG.get_run_dates()` is deprecated. Please use 
`DAG.iter_dagrun_infos_between()` instead.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        earliest = timezone.coerce_datetime(start_date)
         if end_date is None:
-            end = pendulum.now(timezone.utc)
+            latest = pendulum.now(timezone.utc)
         else:
-            end = pendulum.instance(end_date)
-        # HACK: Sub-DAGs are currently scheduled differently. For example, say
-        # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
-        # DAG should be first scheduled to run on midnight 2021-06-04, but a
-        # sub-DAG should be first scheduled to run RIGHT NOW. We can change
-        # this, but since the sub-DAG is going away in 3.0 anyway, let's keep
-        # compatibility for now and remove this entirely later.
-        if self.is_subdag:
-            align = False
-        return sorted(self.timetable.iter_between(start, end, align=align))
+            latest = pendulum.instance(end_date)
+        return [info.logical_date for info in 
self.iter_dagrun_infos_between(earliest, latest)]
 
     def normalize_schedule(self, dttm):
         warnings.warn(
@@ -2018,6 +2068,7 @@ class DAG(LoggingMixin):
         session=None,
         dag_hash: Optional[str] = None,
         creating_job_id: Optional[int] = None,
+        data_interval: Optional[Tuple[datetime, datetime]] = None,
     ):
         """
         Creates a dag run from this dag including the tasks associated with 
this dag.
@@ -2043,6 +2094,8 @@ class DAG(LoggingMixin):
         :type session: sqlalchemy.orm.session.Session
         :param dag_hash: Hash of Serialized DAG
         :type dag_hash: str
+        :param data_interval: Data interval of the DagRun
+        :type data_interval: tuple[datetime, datetime] | None
         """
         if run_id and not run_type:
             if not isinstance(run_id, str):
@@ -2057,6 +2110,9 @@ class DAG(LoggingMixin):
                 "Creating DagRun needs either `run_id` or both `run_type` and 
`execution_date`"
             )
 
+        if run_type == DagRunType.MANUAL and data_interval is None and 
execution_date is not None:
+            data_interval = 
self.timetable.infer_data_interval(timezone.coerce_datetime(execution_date))
+
         run = DagRun(
             dag_id=self.dag_id,
             run_id=run_id,
@@ -2068,6 +2124,7 @@ class DAG(LoggingMixin):
             run_type=run_type,
             dag_hash=dag_hash,
             creating_job_id=creating_job_id,
+            data_interval=data_interval,
         )
         session.add(run)
         session.flush()
@@ -2410,9 +2467,14 @@ class DagModel(Base):
 
     has_task_concurrency_limits = Column(Boolean, nullable=False)
 
-    # The execution_date of the next dag run
+    # The logical date of the next dag run.
     next_dagrun = Column(UtcDateTime)
-    # Earliest time at which this ``next_dagrun`` can be created
+
+    # Must be either both NULL or both datetime.
+    next_dagrun_data_interval_start = Column(UtcDateTime)
+    next_dagrun_data_interval_end = Column(UtcDateTime)
+
+    # Earliest time at which this ``next_dagrun`` can be created.
     next_dagrun_create_after = Column(UtcDateTime)
 
     __table_args__ = (
@@ -2446,6 +2508,26 @@ class DagModel(Base):
         return f"<DAG: {self.dag_id}>"
 
     @property
+    def next_dagrun_data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    f"Inconsistent DagModel: "
+                    
f"next_dagrun_data_interval_start={self.next_dagrun_data_interval_start!r}, "
+                    
f"next_dagrun_data_interval_end={self.next_dagrun_data_interval_end!r}; "
+                    f"they must be either both None or both datetime"
+                )
+            return None
+        return (self.next_dagrun_data_interval_start, 
self.next_dagrun_data_interval_end)
+
+    @next_dagrun_data_interval.setter
+    def next_dagrun_data_interval(self, value: Optional[Tuple[datetime, 
datetime]]) -> None:
+        if value is None:
+            self.next_dagrun_data_interval_start = 
self.next_dagrun_data_interval_end = None
+        else:
+            self.next_dagrun_data_interval_start, 
self.next_dagrun_data_interval_end = value
+
+    @property
     def timezone(self):
         return settings.TIMEZONE
 
@@ -2588,7 +2670,13 @@ class DagModel(Base):
         :param most_recent_dag_run: DateTime of most recent run of this dag, 
or none if not yet scheduled.
         :param active_runs_of_dag: Number of currently active runs of this dag
         """
-        self.next_dagrun, self.next_dagrun_create_after = 
dag.next_dagrun_info(most_recent_dag_run)
+        next_dagrun_info = dag.next_dagrun_info(most_recent_dag_run)
+        if next_dagrun_info is None:
+            self.next_dagrun_data_interval = self.next_dagrun = 
self.next_dagrun_create_after = None
+        else:
+            self.next_dagrun_data_interval = next_dagrun_info.data_interval
+            self.next_dagrun = next_dagrun_info.logical_date
+            self.next_dagrun_create_after = next_dagrun_info.run_after
 
         if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
             # Since this happens every time the dag is parsed it would be 
quite spammy at info
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 508312b..098fd5b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -76,6 +76,9 @@ class DagRun(Base, LoggingMixin):
     external_trigger = Column(Boolean, default=True)
     run_type = Column(String(50), nullable=False)
     conf = Column(PickleType)
+    # These two must be either both NULL or both datetime.
+    data_interval_start = Column(UtcDateTime)
+    data_interval_end = Column(UtcDateTime)
     # When a scheduler last attempted to schedule TIs for this DagRun
     last_scheduling_decision = Column(UtcDateTime)
     dag_hash = Column(String(32))
@@ -91,7 +94,7 @@ class DagRun(Base, LoggingMixin):
 
     task_instances = relationship(
         TI,
-        primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == 
execution_date),  # type: ignore
+        primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == 
execution_date),
         foreign_keys=(dag_id, execution_date),
         backref=backref('dag_run', uselist=False),
     )
@@ -115,7 +118,14 @@ class DagRun(Base, LoggingMixin):
         run_type: Optional[str] = None,
         dag_hash: Optional[str] = None,
         creating_job_id: Optional[int] = None,
+        data_interval: Optional[Tuple[datetime, datetime]] = None,
     ):
+        if data_interval is None:
+            # Legacy: Only happen for runs created prior to Airflow 2.2.
+            self.data_interval_start = self.data_interval_end = None
+        else:
+            self.data_interval_start, self.data_interval_end = data_interval
+
         self.dag_id = dag_id
         self.run_id = run_id
         self.execution_date = execution_date
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6c2ca8d..6f145f9 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -23,7 +23,7 @@ import os
 import pickle
 import signal
 import warnings
-from collections import defaultdict
+from collections import defaultdict, namedtuple
 from datetime import datetime, timedelta
 from functools import partial
 from tempfile import NamedTemporaryFile
@@ -54,6 +54,7 @@ from sqlalchemy.sql.expression import tuple_
 from sqlalchemy.sql.sqltypes import BigInteger
 
 from airflow import settings
+from airflow.compat.functools import cache
 from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowException,
@@ -818,51 +819,69 @@ class TaskInstance(Base, LoggingMixin):
         return count == len(task.downstream_task_ids)
 
     @provide_session
-    def get_previous_ti(
-        self, state: Optional[str] = None, session: Session = None
-    ) -> Optional['TaskInstance']:
-        """
-        The task instance for the task that ran before this task instance.
+    def get_previous_dagrun(
+        self,
+        state: Optional[str] = None,
+        session: Optional[Session] = None,
+    ) -> Optional["DagRun"]:
+        """The DagRun that ran before this task instance's DagRun.
 
         :param state: If passed, it only take into account instances of a 
specific state.
-        :param session: SQLAlchemy ORM Session
+        :param session: SQLAlchemy ORM Session.
         """
         dag = self.task.dag
-        if dag:
-            dr = self.get_dagrun(session=session)
+        if dag is None:
+            return None
 
-            # LEGACY: most likely running from unit tests
-            if not dr:
-                # Means that this TaskInstance is NOT being run from a DR, but 
from a catchup
-                try:
-                    # XXX: This uses DAG internals, but as the outer comment
-                    # said, the block is only reached for legacy reasons for
-                    # development code, so that's OK-ish.
-                    schedule = dag.timetable._schedule
-                except AttributeError:
-                    return None
-                dt = pendulum.instance(self.execution_date)
-                return TaskInstance(
-                    task=self.task,
-                    execution_date=schedule.get_prev(dt),
-                )
+        dr = self.get_dagrun(session=session)
+
+        # LEGACY: most likely running from unit tests
+        if not dr:
+            # Means that this TaskInstance is NOT being run from a DR, but 
from a catchup
+            try:
+                # XXX: This uses DAG internals, but as the outer comment
+                # said, the block is only reached for legacy reasons for
+                # development code, so that's OK-ish.
+                schedule = dag.timetable._schedule
+            except AttributeError:
+                return None
+            dt = pendulum.instance(self.execution_date)
+            return TaskInstance(
+                task=self.task,
+                execution_date=schedule.get_prev(dt),
+            )
 
-            dr.dag = dag
+        dr.dag = dag
 
-            # We always ignore schedule in dagrun lookup when `state` is given 
or `schedule_interval is None`.
-            # For legacy reasons, when `catchup=True`, we use 
`get_previous_scheduled_dagrun` unless
-            # `ignore_schedule` is `True`.
-            ignore_schedule = state is not None or dag.schedule_interval is 
None
-            if dag.catchup is True and not ignore_schedule:
-                last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
-            else:
-                last_dagrun = dr.get_previous_dagrun(session=session, 
state=state)
+        # We always ignore schedule in dagrun lookup when `state` is given or 
`schedule_interval is None`.
+        # For legacy reasons, when `catchup=True`, we use 
`get_previous_scheduled_dagrun` unless
+        # `ignore_schedule` is `True`.
+        ignore_schedule = state is not None or dag.schedule_interval is None
+        if dag.catchup is True and not ignore_schedule:
+            last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
+        else:
+            last_dagrun = dr.get_previous_dagrun(session=session, state=state)
 
-            if last_dagrun:
-                return last_dagrun.get_task_instance(self.task_id, 
session=session)
+        if last_dagrun:
+            return last_dagrun
 
         return None
 
+    @provide_session
+    def get_previous_ti(
+        self, state: Optional[str] = None, session: Session = None
+    ) -> Optional['TaskInstance']:
+        """
+        The task instance for the task that ran before this task instance.
+
+        :param state: If passed, it only take into account instances of a 
specific state.
+        :param session: SQLAlchemy ORM Session
+        """
+        dagrun = self.get_previous_dagrun(state, session=session)
+        if dagrun is None:
+            return None
+        return dagrun.get_task_instance(self.task_id, session=session)
+
     @property
     def previous_ti(self):
         """
@@ -1685,66 +1704,63 @@ class TaskInstance(Base, LoggingMixin):
 
         integrate_macros_plugins()
 
-        params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, 
execution_date=self.execution_date)
-                .first()
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger 
run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",
             )
-            run_id = dag_run.run_id if dag_run else None
-            session.expunge_all()
-            session.commit()
 
-        ds = self.execution_date.strftime('%Y-%m-%d')
-        ts = self.execution_date.isoformat()
-        yesterday_ds = (self.execution_date - 
timedelta(1)).strftime('%Y-%m-%d')
-        tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
-
-        # For manually triggered dagruns that aren't run on a schedule, 
next/previous
-        # schedule dates don't make sense, and should be set to execution date 
for
-        # consistency with how execution_date is set for manually triggered 
tasks, i.e.
-        # triggered_date == execution_date.
-        if dag_run and dag_run.external_trigger:
-            prev_execution_date = self.execution_date
-            next_execution_date = self.execution_date
-        else:
-            prev_execution_date = 
task.dag.previous_schedule(self.execution_date)
-            next_execution_date = 
task.dag.following_schedule(self.execution_date)
-
-        next_ds = None
-        next_ds_nodash = None
-        if next_execution_date:
-            next_ds = next_execution_date.strftime('%Y-%m-%d')
-            next_ds_nodash = next_ds.replace('-', '')
-            next_execution_date = pendulum.instance(next_execution_date)
-
-        prev_ds = None
-        prev_ds_nodash = None
-        if prev_execution_date:
-            prev_ds = prev_execution_date.strftime('%Y-%m-%d')
-            prev_ds_nodash = prev_ds.replace('-', '')
-            prev_execution_date = pendulum.instance(prev_execution_date)
+        params = {}  # type: Dict[str, Any]
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+        if task.params:
+            params.update(task.params)
+        if conf.getboolean('core', 'dag_run_conf_overrides_params'):
+            self.overwrite_params_with_dag_run_conf(params=params, 
dag_run=dag_run)
 
+        # DagRuns scheduled prior to Airflow 2.2 and by tests don't always have
+        # a data interval, and we default to execution_date for compatibility.
+        compat_interval_start = 
timezone.coerce_datetime(dag_run.data_interval_start or self.execution_date)
+        ds = compat_interval_start.strftime('%Y-%m-%d')
         ds_nodash = ds.replace('-', '')
-        ts_nodash = self.execution_date.strftime('%Y%m%dT%H%M%S')
+        ts = compat_interval_start.isoformat()
+        ts_nodash = compat_interval_start.strftime('%Y%m%dT%H%M%S')
         ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
-        yesterday_ds_nodash = yesterday_ds.replace('-', '')
-        tomorrow_ds_nodash = tomorrow_ds.replace('-', '')
 
-        ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}"
+        @cache  # Prevent multiple database access.
+        def _get_previous_dagrun_success() -> Optional["DagRun"]:
+            return self.get_previous_dagrun(state=State.SUCCESS, 
session=session)
 
-        if task.params:
-            params.update(task.params)
+        def get_prev_data_interval_start_success() -> 
Optional[pendulum.DateTime]:
+            dagrun = _get_previous_dagrun_success()
+            if dagrun is None:
+                return None
+            return timezone.coerce_datetime(dagrun.data_interval_start)
 
-        if conf.getboolean('core', 'dag_run_conf_overrides_params'):
-            self.overwrite_params_with_dag_run_conf(params=params, 
dag_run=dag_run)
+        def get_prev_data_interval_end_success() -> 
Optional[pendulum.DateTime]:
+            dagrun = _get_previous_dagrun_success()
+            if dagrun is None:
+                return None
+            return timezone.coerce_datetime(dagrun.data_interval_end)
+
+        def get_prev_start_date_success() -> Optional[pendulum.DateTime]:
+            dagrun = _get_previous_dagrun_success()
+            if dagrun is None:
+                return None
+            return timezone.coerce_datetime(dagrun.start_date)
+
+        # Custom accessors.
 
         class VariableAccessor:
             """
@@ -1826,37 +1842,129 @@ class TaskInstance(Base, LoggingMixin):
                 except AirflowNotFoundException:
                     return default_conn
 
+        # Create lazy proxies for deprecated stuff.
+
+        def deprecated_proxy(func, *, key, replacement=None) -> 
lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        @cache
+        def get_yesterday_ds() -> str:
+            return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
+
+        def get_yesterday_ds_nodash() -> str:
+            return get_yesterday_ds().replace('-', '')
+
+        @cache
+        def get_tomorrow_ds() -> str:
+            return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
+
+        def get_tomorrow_ds_nodash() -> str:
+            return get_tomorrow_ds().replace('-', '')
+
+        @cache
+        def get_next_execution_date() -> Optional[pendulum.DateTime]:
+            # For manually triggered dagruns that aren't run on a schedule,
+            # next/previous execution dates don't make sense, and should be set
+            # to execution date for consistency with how execution_date is set
+            # for manually triggered tasks, i.e. triggered_date == 
execution_date.
+            if dag_run.external_trigger:
+                next_execution_date = self.execution_date
+            else:
+                next_execution_date = 
task.dag.following_schedule(self.execution_date)
+            if next_execution_date is None:
+                return None
+            return timezone.coerce_datetime(next_execution_date)
+
+        def get_next_ds() -> Optional[str]:
+            execution_date = get_next_execution_date()
+            if execution_date is None:
+                return None
+            return execution_date.strftime('%Y-%m-%d')
+
+        def get_next_ds_nodash() -> Optional[str]:
+            ds = get_next_ds()
+            if ds is None:
+                return ds
+            return ds.replace('-', '')
+
+        @cache
+        def get_prev_execution_date():
+            if dag_run.external_trigger:
+                return timezone.coerce_datetime(self.execution_date)
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", DeprecationWarning)
+                return task.dag.previous_schedule(self.execution_date)
+
+        @cache
+        def get_prev_ds() -> Optional[str]:
+            execution_date = get_prev_execution_date()
+            if execution_date is None:
+                return None
+            return execution_date.strftime(r'%Y-%m-%d')
+
+        def get_prev_ds_nodash() -> Optional[str]:
+            prev_ds = get_prev_ds()
+            if prev_ds is None:
+                return None
+            return prev_ds.replace('-', '')
+
         return {
             'conf': conf,
             'dag': task.dag,
             'dag_run': dag_run,
+            'data_interval_end': 
timezone.coerce_datetime(dag_run.data_interval_end),
+            'data_interval_start': 
timezone.coerce_datetime(dag_run.data_interval_start),
             'ds': ds,
             'ds_nodash': ds_nodash,
-            'execution_date': pendulum.instance(self.execution_date),
+            'execution_date': deprecated_proxy(
+                lambda: timezone.coerce_datetime(self.execution_date),
+                key='execution_date',
+                replacement='data_interval_start',
+            ),
             'inlets': task.inlets,
             'macros': macros,
-            'next_ds': next_ds,
-            'next_ds_nodash': next_ds_nodash,
-            'next_execution_date': next_execution_date,
+            'next_ds': deprecated_proxy(get_next_ds, key="next_ds", 
replacement="data_interval_end | ds"),
+            'next_ds_nodash': deprecated_proxy(
+                get_next_ds_nodash,
+                key="next_ds_nodash",
+                replacement="data_interval_end | ds_nodash",
+            ),
+            'next_execution_date': deprecated_proxy(
+                get_next_execution_date,
+                key='next_execution_date',
+                replacement='data_interval_end',
+            ),
             'outlets': task.outlets,
             'params': params,
-            'prev_ds': prev_ds,
-            'prev_ds_nodash': prev_ds_nodash,
-            'prev_execution_date': prev_execution_date,
-            'prev_execution_date_success': lazy_object_proxy.Proxy(
-                lambda: self.get_previous_execution_date(state=State.SUCCESS)
-            ),
-            'prev_start_date_success': lazy_object_proxy.Proxy(
-                lambda: self.get_previous_start_date(state=State.SUCCESS)
+            'prev_data_interval_start_success': 
lazy_object_proxy.Proxy(get_prev_data_interval_start_success),
+            'prev_data_interval_end_success': 
lazy_object_proxy.Proxy(get_prev_data_interval_end_success),
+            'prev_ds': deprecated_proxy(get_prev_ds, key="prev_ds"),
+            'prev_ds_nodash': deprecated_proxy(get_prev_ds_nodash, 
key="prev_ds_nodash"),
+            'prev_execution_date': deprecated_proxy(get_prev_execution_date, 
key='prev_execution_date'),
+            'prev_execution_date_success': deprecated_proxy(
+                lambda: self.get_previous_execution_date(state=State.SUCCESS, 
session=session),
+                key='prev_execution_date_success',
+                replacement='prev_data_interval_start_success',
             ),
-            'run_id': run_id,
+            'prev_start_date_success': 
lazy_object_proxy.Proxy(get_prev_start_date_success),
+            'run_id': dag_run.run_id,
             'task': task,
             'task_instance': self,
-            'task_instance_key_str': ti_key_str,
+            'task_instance_key_str': 
f"{task.dag_id}__{task.task_id}__{ds_nodash}",
             'test_mode': self.test_mode,
             'ti': self,
-            'tomorrow_ds': tomorrow_ds,
-            'tomorrow_ds_nodash': tomorrow_ds_nodash,
+            'tomorrow_ds': deprecated_proxy(get_tomorrow_ds, 
key='tomorrow_ds'),
+            'tomorrow_ds_nodash': deprecated_proxy(get_tomorrow_ds_nodash, 
key='tomorrow_ds_nodash'),
             'ts': ts,
             'ts_nodash': ts_nodash,
             'ts_nodash_with_tz': ts_nodash_with_tz,
@@ -1865,8 +1973,8 @@ class TaskInstance(Base, LoggingMixin):
                 'value': VariableAccessor(),
             },
             'conn': ConnectionAccessor(),
-            'yesterday_ds': yesterday_ds,
-            'yesterday_ds_nodash': yesterday_ds_nodash,
+            'yesterday_ds': deprecated_proxy(get_yesterday_ds, 
key='yesterday_ds'),
+            'yesterday_ds_nodash': deprecated_proxy(get_yesterday_ds_nodash, 
key='yesterday_ds_nodash'),
         }
 
     def get_rendered_template_fields(self):
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 511cb99..c0f6393 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -341,8 +341,11 @@ class PythonVirtualenvOperator(PythonOperator):
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
-            self.requirements.append('dill')
+        if not self.system_site_packages:
+            if 'lazy-object-proxy' not in self.requirements:
+                self.requirements.append('lazy-object-proxy')
+            if self.use_dill and 'dill' not in self.requirements:
+                self.requirements.append('dill')
         self.pickling_library = dill if self.use_dill else pickle
 
     def execute(self, context: Dict):
diff --git a/airflow/operators/subdag.py b/airflow/operators/subdag.py
index 66ac6fb..62b9b77 100644
--- a/airflow/operators/subdag.py
+++ b/airflow/operators/subdag.py
@@ -21,8 +21,9 @@ The module which provides a way to nest your DAGs and so your 
levels of complexi
 """
 
 import warnings
+from datetime import datetime
 from enum import Enum
-from typing import Dict, Optional
+from typing import Dict, Optional, Tuple
 
 from sqlalchemy.orm.session import Session
 
@@ -159,12 +160,17 @@ class SubDagOperator(BaseSensorOperator):
         dag_run = self._get_dagrun(execution_date)
 
         if dag_run is None:
+            if context['data_interval_start'] is None or 
context['data_interval_end'] is None:
+                data_interval: Optional[Tuple[datetime, datetime]] = None
+            else:
+                data_interval = (context['data_interval_start'], 
context['data_interval_end'])
             dag_run = self.subdag.create_dagrun(
                 run_type=DagRunType.SCHEDULED,
                 execution_date=execution_date,
                 state=State.RUNNING,
                 conf=self.conf,
                 external_trigger=True,
+                data_interval=data_interval,
             )
             self.log.info("Created DagRun: %s", dag_run.run_id)
         else:
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index 5faf9db..7027851 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Iterator, NamedTuple, Optional
+from typing import NamedTuple, Optional
 
 from pendulum import DateTime
 
@@ -23,14 +23,16 @@ from airflow.typing_compat import Protocol
 
 
 class DataInterval(NamedTuple):
-    """A data interval for a DagRun to operate over.
-
-    The represented interval is ``[start, end)``.
-    """
+    """A data interval for a DagRun to operate over."""
 
     start: DateTime
     end: DateTime
 
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent an "interval" containing only an exact time."""
+        return cls(start=at, end=at)
+
 
 class TimeRestriction(NamedTuple):
     """Restriction on when a DAG can be scheduled for a run.
@@ -62,12 +64,12 @@ class DagRunInfo(NamedTuple):
     """The earliest time this DagRun is created and its tasks scheduled."""
 
     data_interval: DataInterval
-    """The data interval this DagRun to operate over, if applicable."""
+    """The data interval this DagRun to operate over."""
 
     @classmethod
     def exact(cls, at: DateTime) -> "DagRunInfo":
         """Represent a run on an exact time."""
-        return cls(run_after=at, data_interval=DataInterval(at, at))
+        return cls(run_after=at, data_interval=DataInterval.exact(at))
 
     @classmethod
     def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
@@ -79,6 +81,15 @@ class DagRunInfo(NamedTuple):
         """
         return cls(run_after=end, data_interval=DataInterval(start, end))
 
+    @property
+    def logical_date(self) -> DateTime:
+        """Infer the logical date to represent a DagRun.
+
+        This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
+        essentially the same, just a different name.
+        """
+        return self.data_interval.start
+
 
 class Timetable(Protocol):
     """Protocol that all Timetable classes are expected to implement."""
@@ -90,6 +101,14 @@ class Timetable(Protocol):
         """
         raise NotImplementedError()
 
+    def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+        """When a DAG run is manually triggered, infer a data interval for it.
+
+        This is used for e.g. manually-triggered runs, where ``run_after`` 
would
+        be when the user triggers the run.
+        """
+        raise NotImplementedError()
+
     def next_dagrun_info(
         self,
         last_automated_dagrun: Optional[DateTime],
@@ -108,26 +127,3 @@ class Timetable(Protocol):
             a DagRunInfo object when asked at another time.
         """
         raise NotImplementedError()
-
-    def iter_between(
-        self,
-        start: DateTime,
-        end: DateTime,
-        *,
-        align: bool,
-    ) -> Iterator[DateTime]:
-        """Get schedules between the *start* and *end*."""
-        if start > end:
-            raise ValueError(f"start ({start}) > end ({end})")
-        between = TimeRestriction(start, end, catchup=True)
-
-        if align:
-            next_info = self.next_dagrun_info(None, between)
-        else:
-            yield start
-            next_info = self.next_dagrun_info(start, between)
-
-        while next_info is not None:
-            dagrun_start = next_info.data_interval.start
-            yield dagrun_start
-            next_info = self.next_dagrun_info(dagrun_start, between)
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 6b3a46e..9fe4de2 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -20,7 +20,7 @@ from typing import Any, Optional
 
 from pendulum import DateTime
 
-from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, 
Schedule
 
 
@@ -43,6 +43,13 @@ class _DataIntervalTimetable(Timetable):
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually triggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.
+        end = self._schedule.get_prev(self._schedule.align(run_after))
+        return DataInterval(start=self._schedule.get_prev(end), end=end)
+
     def next_dagrun_info(
         self,
         last_automated_dagrun: Optional[DateTime],
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index b6208a6..f9c062e 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -19,7 +19,7 @@ from typing import Any, Optional
 
 from pendulum import DateTime
 
-from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 
 
 class NullTimetable(Timetable):
@@ -37,6 +37,9 @@ class NullTimetable(Timetable):
     def validate(self) -> None:
         pass
 
+    def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+        return DataInterval.exact(run_after)
+
     def next_dagrun_info(
         self,
         last_automated_dagrun: Optional[DateTime],
@@ -60,6 +63,9 @@ class OnceTimetable(Timetable):
     def validate(self) -> None:
         pass
 
+    def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+        return DataInterval.exact(run_after)
+
     def next_dagrun_info(
         self,
         last_automated_dagrun: Optional[DateTime],
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index c022079..f1c5b4d 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -73,6 +73,8 @@ def correct_maybe_zipped(fileloc):
     If the path contains a folder with a .zip suffix, then
     the folder is treated as a zip archive and path to zip is returned.
     """
+    if not fileloc:
+        return fileloc
     _, archive, _ = ZIP_REGEX.search(fileloc).groups()
     if archive and zipfile.is_zipfile(archive):
         return archive
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 6d58055..0803557 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``54bebd308c5f`` (head)        | ``97cdd93827b8`` |                 | Adds 
``trigger`` table and deferrable operator columns to task instance              
 |
+| ``142555e44c17`` (head)        | ``54bebd308c5f`` |                 | Add 
``data_interval_start`` and ``data_interval_end`` to ``DagRun``                 
  |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``54bebd308c5f``               | ``97cdd93827b8`` |                 | Adds 
``trigger`` table and deferrable operator columns to task instance              
 |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``97cdd93827b8``               | ``30867afad44a`` |                 | Add 
``queued_at`` column in ``dag_run`` table                                       
  |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/docs/apache-airflow/templates-ref.rst 
b/docs/apache-airflow/templates-ref.rst
index 8616b68..0b3dd7c 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -23,7 +23,8 @@ Templates reference
 Variables, macros and filters can be used in templates (see the 
:ref:`concepts:jinja-templating` section)
 
 The following come for free out of the box with Airflow.
-Additional custom macros can be added globally through :doc:`plugins`, or at a 
DAG level through the ``DAG.user_defined_macros`` argument.
+Additional custom macros can be added globally through :doc:`plugins`, or at a 
DAG level through the
+``DAG.user_defined_macros`` argument.
 
 .. _templates:variables:
 
@@ -32,55 +33,76 @@ Variables
 The Airflow engine passes a few variables by default that are accessible
 in all templates
 
+==========================================  
====================================
+Variable                                    Description
+==========================================  
====================================
+``{{ data_interval_start }}``               Start of the data interval 
(`pendulum.Pendulum`_ or ``None``).
+``{{ data_interval_end }}``                 End of the data interval 
(`pendulum.Pendulum`_ or ``None``).
+``{{ ds }}``                                Start of the data interval as 
``YYYY-MM-DD``.
+                                            Same as ``{{ data_interval_start | 
ds }}``.
+``{{ ds_nodash }}``                         Start of the data interval as 
``YYYYMMDD``.
+                                            Same as ``{{ data_interval_start | 
ds_nodash }}``.
+``{{ ts }}``                                Same as ``{{ data_interval_start | 
ts }}``.
+                                            Example: 
``2018-01-01T00:00:00+00:00``.
+``{{ ts_nodash_with_tz }}``                 Same as ``{{ data_interval_start | 
ts_nodash_with_tz }}``.
+                                            Example: ``20180101T000000+0000``.
+``{{ ts_nodash }}``                         Same as ``{{ data_interval_start | 
ts_nodash }}``.
+                                            Example: ``20180101T000000``.
+``{{ prev_data_interval_start_success }}``  Start of the data interval from 
prior successful DAG run
+                                            (`pendulum.Pendulum`_ or ``None``).
+``{{ prev_data_interval_end_success }}``    End of the data interval from 
prior successful DAG run
+                                            (`pendulum.Pendulum`_ or ``None``).
+``{{ prev_start_date_success }}``           Start date from prior successful 
dag run (if available)
+                                            (`pendulum.Pendulum`_ or ``None``).
+``{{ dag }}``                               The DAG object.
+``{{ task }}``                              The Task object.
+``{{ macros }}``                            A reference to the macros package, 
described below.
+``{{ task_instance }}``                     The task_instance object.
+``{{ ti }}``                                Same as ``{{ task_instance }}``.
+``{{ params }}``                            A reference to the user-defined 
params dictionary which can be
+                                            overridden by the dictionary 
passed through ``trigger_dag -c`` if
+                                            you enabled 
``dag_run_conf_overrides_params`` in ``airflow.cfg``.
+``{{ var.value.my_var }}``                  Global defined variables 
represented as a dictionary.
+``{{ var.json.my_var.path }}``              Global defined variables 
represented as a dictionary.
+                                            With deserialized JSON object, 
append the path to the key within
+                                            the JSON object.
+``{{ conn.my_conn_id }}``                   Connection represented as a 
dictionary.
+``{{ task_instance_key_str }}``             A unique, human-readable key to 
the task instance formatted
+                                            
``{dag_id}__{task_id}__{ds_nodash}``.
+``{{ conf }}``                              The full configuration object 
located at
+                                            ``airflow.configuration.conf`` 
which represents the content of
+                                            your ``airflow.cfg``.
+``{{ run_id }}``                            The ``run_id`` of the current DAG 
run.
+``{{ dag_run }}``                           A reference to the DagRun object.
+``{{ test_mode }}``                         Whether the task instance was 
called using the CLI's test
+                                            subcommand.
+==========================================  
====================================
+
+The following variables are deprecated. They are kept for backward 
compatibility, but you should convert
+existing code to use other variables instead.
+
 =====================================   ====================================
-Variable                                Description
+Deprecated Variable                     Description
 =====================================   ====================================
-``{{ ds }}``                            the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}``                     the execution date as ``YYYYMMDD``
-``{{ prev_ds }}``                       the previous execution date as 
``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-08`` and 
``schedule_interval`` is ``@weekly``,
-                                        ``{{ prev_ds }}`` will be 
``2018-01-01``
-``{{ prev_ds_nodash }}``                the previous execution date as 
``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}``                       the next execution date as 
``YYYY-MM-DD``
-                                        if ``{{ ds }}`` is ``2018-01-01`` and 
``schedule_interval`` is ``@weekly``,
-                                        ``{{ next_ds }}`` will be 
``2018-01-08``
+``{{ execution_date }}``                the execution date (logical date), 
same as ``logical_date``
+``{{ next_execution_date }}``           the next execution date (if available) 
(`pendulum.Pendulum`_)
+                                        if ``{{ execution_date }}`` is 
``2018-01-01 00:00:00`` and
+                                        ``schedule_interval`` is ``@weekly``, 
``{{ next_execution_date }}``
+                                        will be ``2018-01-08 00:00:00``
+``{{ next_ds }}``                       the next execution date as 
``YYYY-MM-DD`` if exists, else ``None``
 ``{{ next_ds_nodash }}``                the next execution date as 
``YYYYMMDD`` if exists, else ``None``
+``{{ prev_execution_date }}``           the previous execution date (if 
available) (`pendulum.Pendulum`_)
+                                        if ``{{ execution_date }}`` is 
``2018-01-08 00:00:00`` and
+                                        ``schedule_interval`` is ``@weekly``, 
``{{ prev_execution_date }}``
+                                        will be ``2018-01-01 00:00:00``
+``{{ prev_ds }}``                       the previous execution date as 
``YYYY-MM-DD`` if exists, else ``None``
+``{{ prev_ds_nodash }}``                the previous execution date as 
``YYYYMMDD`` if exists, else ``None``
 ``{{ yesterday_ds }}``                  the day before the execution date as 
``YYYY-MM-DD``
 ``{{ yesterday_ds_nodash }}``           the day before the execution date as 
``YYYYMMDD``
 ``{{ tomorrow_ds }}``                   the day after the execution date as 
``YYYY-MM-DD``
 ``{{ tomorrow_ds_nodash }}``            the day after the execution date as 
``YYYYMMDD``
-``{{ ts }}``                            same as 
``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
-``{{ ts_nodash }}``                     same as ``ts`` without ``-``, ``:`` 
and TimeZone info. Example: ``20180101T000000``
-``{{ ts_nodash_with_tz }}``             same as ``ts`` without ``-`` and 
``:``. Example: ``20180101T000000+0000``
-``{{ execution_date }}``                the execution_date (logical date) 
(`pendulum.Pendulum`_)
-``{{ prev_execution_date }}``           the previous execution date (if 
available) (`pendulum.Pendulum`_)
-``{{ prev_execution_date_success }}``   execution date from prior successful 
dag run (if available) (`pendulum.Pendulum`_)
-``{{ prev_start_date_success }}``       start date from prior successful dag 
run (if available) (`pendulum.Pendulum`_)
-``{{ next_execution_date }}``           the next execution date 
(`pendulum.Pendulum`_)
-``{{ dag }}``                           the DAG object
-``{{ task }}``                          the Task object
-``{{ macros }}``                        a reference to the macros package, 
described below
-``{{ task_instance }}``                 the task_instance object
-``{{ ti }}``                            same as ``{{ task_instance }}``
-``{{ params }}``                        a reference to the user-defined params 
dictionary which can be overridden by
-                                        the dictionary passed through 
``trigger_dag -c`` if you enabled
-                                        ``dag_run_conf_overrides_params`` in 
``airflow.cfg``
-``{{ var.value.my_var }}``              global defined variables represented 
as a dictionary
-``{{ var.json.my_var.path }}``          global defined variables represented 
as a dictionary
-                                        with deserialized JSON object, append 
the path to the
-                                        key within the JSON object
-``{{ conn.my_conn_id }}``               connection represented as a dictionary
-
-``{{ task_instance_key_str }}``         a unique, human-readable key to the 
task instance
-                                        formatted 
``{dag_id}__{task_id}__{ds_nodash}``
-``{{ conf }}``                          the full configuration object located 
at
-                                        ``airflow.configuration.conf`` which
-                                        represents the content of your
-                                        ``airflow.cfg``
-``{{ run_id }}``                        the ``run_id`` of the current DAG run
-``{{ dag_run }}``                       a reference to the DagRun object
-``{{ test_mode }}``                     whether the task instance was called 
using
-                                        the CLI's test subcommand
+``{{ prev_execution_date_success }}``   execution date from prior successful 
dag run
+
 =====================================   ====================================
 
 Note that you can access the object's attributes and methods with simple
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 0aa13b2..03bd7d8 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -20,7 +20,7 @@ import pytest
 from parameterized import parameterized
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.models import DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
@@ -96,7 +96,8 @@ class TestDagRunEndpoint:
         dag_instance = DagModel(dag_id=dag_id)
         with create_session() as session:
             session.add(dag_instance)
-            session.commit()
+        dag = DAG(dag_id=dag_id, schedule_interval=None)
+        self.app.dag_bag.bag_dag(dag, root_dag=dag)
 
     def _create_test_dag_run(self, state='running', extra_dag=False, 
commit=True):
         dag_runs = []
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 4a510f8..fd8404a 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -412,8 +412,34 @@ class TestCliDags(unittest.TestCase):
 
     def test_trigger_dag(self):
         dag_command.dag_trigger(
-            self.parser.parse_args(['dags', 'trigger', 
'example_bash_operator', '--conf', '{"foo": "bar"}'])
+            self.parser.parse_args(
+                [
+                    'dags',
+                    'trigger',
+                    'example_bash_operator',
+                    '--run-id=test_trigger_dag',
+                    '--exec-date=2021-06-04T09:00:00+08:00',
+                    '--conf={"foo": "bar"}',
+                ],
+            ),
         )
+        with create_session() as session:
+            dagrun = session.query(DagRun).filter(DagRun.run_id == 
"test_trigger_dag").one()
+
+        assert dagrun, "DagRun not created"
+        assert dagrun.run_type == DagRunType.MANUAL
+        assert dagrun.external_trigger
+        assert dagrun.conf == {"foo": "bar"}
+
+        # Coerced to UTC.
+        assert dagrun.execution_date.isoformat(timespec="seconds") == 
"2021-06-04T01:00:00+00:00"
+
+        # example_bash_operator runs every day at midnight, so the data 
interval
+        # should be aligned to the previous day.
+        assert dagrun.data_interval_start.isoformat(timespec="seconds") == 
"2021-06-03T00:00:00+00:00"
+        assert dagrun.data_interval_end.isoformat(timespec="seconds") == 
"2021-06-04T00:00:00+00:00"
+
+    def test_trigger_dag_invalid_conf(self):
         with pytest.raises(ValueError):
             dag_command.dag_trigger(
                 self.parser.parse_args(
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index fe31bd2..d8a5687 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -269,11 +269,13 @@ class TestCore:
         op.resolve_template_files()
 
     def test_task_get_template(self):
-        TI = TaskInstance
-        ti = TI(task=self.runme_0, execution_date=DEFAULT_DATE)
+        ti = TaskInstance(task=self.runme_0, execution_date=DEFAULT_DATE)
         ti.dag = self.dag_bash
         self.dag_bash.create_dagrun(
-            run_type=DagRunType.MANUAL, state=State.RUNNING, 
execution_date=DEFAULT_DATE
+            run_type=DagRunType.MANUAL,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
         )
         ti.run(ignore_ti_state=True)
         context = ti.get_template_context()
@@ -282,23 +284,35 @@ class TestCore:
         assert context['ds'] == '2015-01-01'
         assert context['ds_nodash'] == '20150101'
 
-        # next_ds is 2015-01-02 as the dag interval is daily
+        # next_ds is 2015-01-02 as the dag schedule is daily.
         assert context['next_ds'] == '2015-01-02'
         assert context['next_ds_nodash'] == '20150102'
 
-        # prev_ds is 2014-12-31 as the dag interval is daily
-        assert context['prev_ds'] == '2014-12-31'
-        assert context['prev_ds_nodash'] == '20141231'
-
         assert context['ts'] == '2015-01-01T00:00:00+00:00'
         assert context['ts_nodash'] == '20150101T000000'
         assert context['ts_nodash_with_tz'] == '20150101T000000+0000'
 
-        assert context['yesterday_ds'] == '2014-12-31'
-        assert context['yesterday_ds_nodash'] == '20141231'
-
-        assert context['tomorrow_ds'] == '2015-01-02'
-        assert context['tomorrow_ds_nodash'] == '20150102'
+        assert context['data_interval_start'].isoformat() == 
'2015-01-01T00:00:00+00:00'
+        assert context['data_interval_end'].isoformat() == 
'2015-01-02T00:00:00+00:00'
+
+        # Test deprecated fields.
+        expected_deprecated_fields = [
+            ("prev_ds", "2014-12-31"),
+            ("prev_ds_nodash", "20141231"),
+            ("yesterday_ds", "2014-12-31"),
+            ("yesterday_ds_nodash", "20141231"),
+            ("tomorrow_ds", "2015-01-02"),
+            ("tomorrow_ds_nodash", "20150102"),
+        ]
+        for key, expected_value in expected_deprecated_fields:
+            message = (
+                f"Accessing {key!r} from the template is deprecated and "
+                f"will be removed in a future version."
+            )
+            with pytest.deprecated_call() as recorder:
+                value = str(context[key])  # Simulate template evaluation to 
trigger warning.
+            assert value == expected_value
+            assert [str(m.message) for m in recorder] == [message]
 
     def test_local_task_job(self):
         TI = TaskInstance
@@ -412,13 +426,10 @@ class TestCore:
         ti = TI(task=task, execution_date=execution_date)
         context = ti.get_template_context()
 
-        # next_ds/prev_ds should be the execution date for manually triggered 
runs
+        # next_ds should be the execution date for manually triggered runs
         assert context['next_ds'] == execution_ds
         assert context['next_ds_nodash'] == execution_ds_nodash
 
-        assert context['prev_ds'] == execution_ds
-        assert context['prev_ds_nodash'] == execution_ds_nodash
-
     def test_dag_params_and_task_params(self, dag_maker):
         # This test case guards how params of DAG and Operator work together.
         # - If any key exists in either DAG's or Operator's params,
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 25077fc..d28552a 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -268,6 +268,7 @@ class TestAirflowTaskDecorator(TestPythonBase):
         self.dag.create_dagrun(
             run_id=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
@@ -299,6 +300,7 @@ class TestAirflowTaskDecorator(TestPythonBase):
         self.dag.create_dagrun(
             run_id=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index d70606a..cf3b5c5 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -955,8 +955,8 @@ class TestBackfillJob:
                 # reached, so it is waiting
                 dag_run_created_cond.wait(timeout=1.5)
                 dagruns = DagRun.find(dag_id=dag_id)
-                dr = dagruns[0]
                 assert 1 == len(dagruns)
+                dr = dagruns[0]
                 assert dr.run_id == run_id
 
                 # allow the backfill to execute
@@ -1330,30 +1330,34 @@ class TestBackfillJob:
 
         session.close()
 
-    def test_dag_get_run_dates(self, dag_maker):
+    def test_dag_dagrun_infos_between(self, dag_maker):
         with dag_maker(
-            dag_id='test_get_dates', start_date=DEFAULT_DATE, 
schedule_interval="@hourly"
+            dag_id='dagrun_infos_between', start_date=DEFAULT_DATE, 
schedule_interval="@hourly"
         ) as test_dag:
             DummyOperator(
                 task_id='dummy',
                 owner='airflow',
             )
 
-        assert [DEFAULT_DATE] == test_dag.get_run_dates(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE,
-            align=True,
-        )
+        assert [DEFAULT_DATE] == [
+            info.logical_date
+            for info in test_dag.iter_dagrun_infos_between(
+                earliest=DEFAULT_DATE,
+                latest=DEFAULT_DATE,
+            )
+        ]
         assert [
             DEFAULT_DATE - datetime.timedelta(hours=3),
             DEFAULT_DATE - datetime.timedelta(hours=2),
             DEFAULT_DATE - datetime.timedelta(hours=1),
             DEFAULT_DATE,
-        ] == test_dag.get_run_dates(
-            start_date=DEFAULT_DATE - datetime.timedelta(hours=3),
-            end_date=DEFAULT_DATE,
-            align=True,
-        )
+        ] == [
+            info.logical_date
+            for info in test_dag.iter_dagrun_infos_between(
+                earliest=DEFAULT_DATE - datetime.timedelta(hours=3),
+                latest=DEFAULT_DATE,
+            )
+        ]
 
     def test_backfill_run_backwards(self):
         dag = self.dagbag.get_dag("test_start_date_scheduling")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index c41aca8..9030005 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1250,7 +1250,10 @@ class TestSchedulerJob:
         Test if a a dagrun would be scheduled if max_dag_runs has
         been reached but dagrun_timeout is also reached
         """
-        with 
dag_maker(dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout') as 
dag:
+        with dag_maker(
+            dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout',
+            start_date=DEFAULT_DATE,
+        ) as dag:
             DummyOperator(task_id='dummy')
         dag.max_active_runs = 1
         dag.dagrun_timeout = datetime.timedelta(seconds=60)
@@ -1278,6 +1281,8 @@ class TestSchedulerJob:
         assert orm_dag.next_dagrun_create_after
         # But we should record the date of _what run_ it would be
         assert isinstance(orm_dag.next_dagrun, datetime.datetime)
+        assert isinstance(orm_dag.next_dagrun_data_interval_start, 
datetime.datetime)
+        assert isinstance(orm_dag.next_dagrun_data_interval_end, 
datetime.datetime)
 
         # Should be scheduled as dagrun_timeout has passed
         dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
@@ -1294,6 +1299,8 @@ class TestSchedulerJob:
         assert dr.state == State.FAILED
         session.refresh(orm_dag)
         assert isinstance(orm_dag.next_dagrun, datetime.datetime)
+        assert isinstance(orm_dag.next_dagrun_data_interval_start, 
datetime.datetime)
+        assert isinstance(orm_dag.next_dagrun_data_interval_end, 
datetime.datetime)
         assert isinstance(orm_dag.next_dagrun_create_after, datetime.datetime)
 
         expected_callback = DagCallbackRequest(
@@ -1572,9 +1579,12 @@ class TestSchedulerJob:
             run_kwargs = {}
 
         dag = self.dagbag.get_dag(dag_id)
+        dagrun_info = dag.next_dagrun_info(None)
+        assert dagrun_info is not None
+
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
-            execution_date=dag.next_dagrun_info(None)[0],
+            execution_date=dagrun_info.logical_date,
             state=State.RUNNING,
         )
 
@@ -2801,21 +2811,28 @@ class TestSchedulerJob:
         # Verify that dag_model.next_dagrun is equal to next execution_date
         dag_model = session.query(DagModel).get(dag.dag_id)
         assert dag_model.next_dagrun == DEFAULT_DATE
+        assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE
+        assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(minutes=1)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.executor = MockExecutor(do_update=False)
         self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
 
-        # Verify a DagRun is created with the correct execution_date
+        # Verify a DagRun is created with the correct dates
         # when Scheduler._do_scheduling is run in the Scheduler Loop
         self.scheduler_job._do_scheduling(session)
         dr1 = dag.get_dagrun(DEFAULT_DATE, session=session)
         assert dr1 is not None
         assert dr1.state == State.RUNNING
+        assert dr1.execution_date == DEFAULT_DATE
+        assert dr1.data_interval_start == DEFAULT_DATE
+        assert dr1.data_interval_end == DEFAULT_DATE + timedelta(minutes=1)
 
-        # Verify that dag_model.next_dagrun is set to next execution_date
+        # Verify that dag_model.next_dagrun is set to next interval
         dag_model = session.query(DagModel).get(dag.dag_id)
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+        assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + 
timedelta(minutes=1)
+        assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(minutes=2)
 
         # Trigger the Dag externally
         dr = dag.create_dagrun(
@@ -2833,6 +2850,8 @@ class TestSchedulerJob:
         # triggered DagRun.
         dag_model = session.query(DagModel).get(dag.dag_id)
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+        assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + 
timedelta(minutes=1)
+        assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(minutes=2)
 
     def test_scheduler_create_dag_runs_check_existing_run(self, dag_maker):
         """
@@ -2887,7 +2906,9 @@ class TestSchedulerJob:
         # Test that this does not raise any error
         self.scheduler_job._create_dag_runs([dag_model], session)
 
-        # Assert dag_model.next_dagrun is set correctly to next execution date
+        # Assert the next dagrun fields are set correctly to next execution 
date
+        assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + 
timedelta(days=1)
+        assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(days=2)
         assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
         session.rollback()
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0b673b9..3807bbb 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -49,6 +49,7 @@ from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.subdag import SubDagOperator
 from airflow.security import permissions
+from airflow.timetables.base import DagRunInfo
 from airflow.timetables.simple import NullTimetable, OnceTimetable
 from airflow.utils import timezone
 from airflow.utils.file import list_py_file_paths
@@ -1482,12 +1483,11 @@ class TestDag(unittest.TestCase):
             'test_scheduler_dagrun_once', start_date=timezone.datetime(2015, 
1, 1), schedule_interval="@once"
         )
 
-        next_date, _ = dag.next_dagrun_info(None)
+        next_info = dag.next_dagrun_info(None)
+        assert next_info and next_info.logical_date == timezone.datetime(2015, 
1, 1)
 
-        assert next_date == timezone.datetime(2015, 1, 1)
-
-        next_date, _ = dag.next_dagrun_info(next_date)
-        assert next_date is None
+        next_info = dag.next_dagrun_info(next_info.logical_date)
+        assert next_info is None
 
     def test_next_dagrun_info_start_end_dates(self):
         """
@@ -1506,15 +1506,16 @@ class TestDag(unittest.TestCase):
         dates = []
         date = None
         for _ in range(runs):
-            date, _ = dag.next_dagrun_info(date)
-            dates.append(date)
-
-        for date in dates:
-            assert date is not None
-
+            next_info = dag.next_dagrun_info(date)
+            if next_info is None:
+                dates.append(None)
+            else:
+                date = next_info.logical_date
+                dates.append(date)
+
+        assert all(date is not None for date in dates)
         assert dates[-1] == end_date
-
-        assert dag.next_dagrun_info(date)[0] is None
+        assert dag.next_dagrun_info(date) is None
 
     def test_next_dagrun_info_catchup(self):
         """
@@ -1596,12 +1597,12 @@ class TestDag(unittest.TestCase):
             catchup=False,
         )
 
-        next_date, _ = dag.next_dagrun_info(None)
-        assert next_date == timezone.datetime(2020, 1, 4)
+        next_info = dag.next_dagrun_info(None)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
1, 4)
 
         # The date to create is in the future, this is handled by 
"DagModel.dags_needing_dagruns"
-        next_date, _ = dag.next_dagrun_info(next_date)
-        assert next_date == timezone.datetime(2020, 1, 5)
+        next_info = dag.next_dagrun_info(next_info.logical_date)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
1, 5)
 
     @freeze_time(timezone.datetime(2020, 5, 4))
     def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self):
@@ -1616,18 +1617,18 @@ class TestDag(unittest.TestCase):
             catchup=True,
         )
 
-        next_date, _ = dag.next_dagrun_info(None)
-        assert next_date == timezone.datetime(2020, 5, 1)
+        next_info = dag.next_dagrun_info(None)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
5, 1)
 
-        next_date, _ = dag.next_dagrun_info(next_date)
-        assert next_date == timezone.datetime(2020, 5, 2)
+        next_info = dag.next_dagrun_info(next_info.logical_date)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
5, 2)
 
-        next_date, _ = dag.next_dagrun_info(next_date)
-        assert next_date == timezone.datetime(2020, 5, 3)
+        next_info = dag.next_dagrun_info(next_info.logical_date)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
5, 3)
 
         # The date to create is in the future, this is handled by 
"DagModel.dags_needing_dagruns"
-        next_date, _ = dag.next_dagrun_info(next_date)
-        assert next_date == timezone.datetime(2020, 5, 4)
+        next_info = dag.next_dagrun_info(next_info.logical_date)
+        assert next_info and next_info.logical_date == timezone.datetime(2020, 
5, 4)
 
     def test_next_dagrun_after_auto_align(self):
         """
@@ -1643,8 +1644,8 @@ class TestDag(unittest.TestCase):
         )
         DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
-        next_date, _ = dag.next_dagrun_info(None)
-        assert next_date == timezone.datetime(2016, 1, 2, 5, 4)
+        next_info = dag.next_dagrun_info(None)
+        assert next_info and next_info.logical_date == timezone.datetime(2016, 
1, 2, 5, 4)
 
         dag = DAG(
             dag_id='test_scheduler_auto_align_2',
@@ -1653,8 +1654,8 @@ class TestDag(unittest.TestCase):
         )
         DummyOperator(task_id='dummy', dag=dag, owner='airflow')
 
-        next_date, _ = dag.next_dagrun_info(None)
-        assert next_date == timezone.datetime(2016, 1, 1, 10, 10)
+        next_info = dag.next_dagrun_info(None)
+        assert next_info and next_info.logical_date == timezone.datetime(2016, 
1, 1, 10, 10)
 
     def test_next_dagrun_after_not_for_subdags(self):
         """
@@ -1693,11 +1694,11 @@ class TestDag(unittest.TestCase):
         subdag.parent_dag = dag
         subdag.is_subdag = True
 
-        next_date, _ = dag.next_dagrun_info(None)
-        assert next_date == timezone.datetime(2019, 1, 1, 0, 0)
+        next_parent_info = dag.next_dagrun_info(None)
+        assert next_parent_info.logical_date == timezone.datetime(2019, 1, 1, 
0, 0)
 
-        next_subdag_date, _ = subdag.next_dagrun_info(None)
-        assert next_subdag_date is None, "SubDags should never have DagRuns 
created by the scheduler"
+        next_subdag_info = subdag.next_dagrun_info(None)
+        assert next_subdag_info is None, "SubDags should never have DagRuns 
created by the scheduler"
 
     def test_replace_outdated_access_control_actions(self):
         outdated_permissions = {
@@ -2029,3 +2030,46 @@ def test_set_task_instance_state():
         assert dagrun.get_state() == State.QUEUED
 
     assert {t.key for t in altered} == {('test_set_task_instance_state', 
'task_1', start_date, 1)}
+
+
[email protected](
+    "start_date, expected_infos",
+    [
+        (
+            DEFAULT_DATE,
+            [DagRunInfo.interval(DEFAULT_DATE, DEFAULT_DATE + 
datetime.timedelta(hours=1))],
+        ),
+        (
+            DEFAULT_DATE - datetime.timedelta(hours=3),
+            [
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=3),
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                    DEFAULT_DATE,
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE,
+                    DEFAULT_DATE + datetime.timedelta(hours=1),
+                ),
+            ],
+        ),
+    ],
+    ids=["in-dag-restriction", "out-of-dag-restriction"],
+)
+def test_iter_dagrun_infos_between(start_date, expected_infos):
+    dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, 
schedule_interval="@hourly")
+    DummyOperator(task_id='dummy', dag=dag)
+
+    iterator = dag.iter_dagrun_infos_between(
+        earliest=pendulum.instance(start_date),
+        latest=pendulum.instance(DEFAULT_DATE),
+        align=True,
+    )
+    assert expected_infos == list(iterator)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 93f6138..cf3952c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1446,13 +1446,20 @@ class TestTaskInstance:
             schedule_interval='0 12 * * *',
         )
 
-        ti = TI(task=task, execution_date=timezone.utcnow())
+        execution_date = timezone.utcnow()
+
+        dag.create_dagrun(
+            execution_date=execution_date,
+            state=State.RUNNING,
+            run_type=DagRunType.MANUAL,
+        )
+
+        ti = TI(task=task, execution_date=execution_date)
 
         template_context = ti.get_template_context()
 
-        assert isinstance(template_context["execution_date"], 
pendulum.DateTime)
-        assert isinstance(template_context["next_execution_date"], 
pendulum.DateTime)
-        assert isinstance(template_context["prev_execution_date"], 
pendulum.DateTime)
+        assert isinstance(template_context["data_interval_start"], 
pendulum.DateTime)
+        assert isinstance(template_context["data_interval_end"], 
pendulum.DateTime)
 
     @pytest.mark.parametrize(
         "content, expected_output",
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 5828f1f..82fbd22 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -162,6 +162,7 @@ class TestPythonOperator(TestPythonBase):
         self.dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
@@ -199,6 +200,7 @@ class TestPythonOperator(TestPythonBase):
         self.dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
diff --git a/tests/operators/test_subdag_operator.py 
b/tests/operators/test_subdag_operator.py
index 9ebdc55..dcdb508 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -148,13 +148,20 @@ class TestSubDagOperator(unittest.TestCase):
         subdag_task._get_dagrun = Mock()
         subdag_task._get_dagrun.side_effect = [None, self.dag_run_success, 
self.dag_run_success]
 
-        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+        context = {
+            'data_interval_start': None,
+            'data_interval_end': None,
+            'execution_date': DEFAULT_DATE,
+        }
+
+        subdag_task.pre_execute(context=context)
+        subdag_task.execute(context=context)
+        subdag_task.post_execute(context=context)
 
         subdag.create_dagrun.assert_called_once_with(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
+            data_interval=None,
             conf=None,
             state=State.RUNNING,
             external_trigger=True,
@@ -178,13 +185,20 @@ class TestSubDagOperator(unittest.TestCase):
         subdag_task._get_dagrun = Mock()
         subdag_task._get_dagrun.side_effect = [None, self.dag_run_success, 
self.dag_run_success]
 
-        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+        context = {
+            'data_interval_start': None,
+            'data_interval_end': None,
+            'execution_date': DEFAULT_DATE,
+        }
+
+        subdag_task.pre_execute(context=context)
+        subdag_task.execute(context=context)
+        subdag_task.post_execute(context=context)
 
         subdag.create_dagrun.assert_called_once_with(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
+            data_interval=None,
             conf=conf,
             state=State.RUNNING,
             external_trigger=True,
@@ -206,10 +220,16 @@ class TestSubDagOperator(unittest.TestCase):
         subdag_task._get_dagrun = Mock()
         subdag_task._get_dagrun.side_effect = [None, self.dag_run_failed, 
self.dag_run_failed]
 
+        context = {
+            'data_interval_start': None,
+            'data_interval_end': None,
+            'execution_date': DEFAULT_DATE,
+        }
+
         with pytest.raises(AirflowException):
-            subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
-            subdag_task.execute(context={'execution_date': DEFAULT_DATE})
-            subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+            subdag_task.pre_execute(context=context)
+            subdag_task.execute(context=context)
+            subdag_task.post_execute(context=context)
 
     def test_execute_skip_if_dagrun_success(self):
         """
@@ -223,9 +243,15 @@ class TestSubDagOperator(unittest.TestCase):
         subdag_task._get_dagrun = Mock()
         subdag_task._get_dagrun.return_value = self.dag_run_success
 
-        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
-        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+        context = {
+            'data_interval_start': None,
+            'data_interval_end': None,
+            'execution_date': DEFAULT_DATE,
+        }
+
+        subdag_task.pre_execute(context=context)
+        subdag_task.execute(context=context)
+        subdag_task.post_execute(context=context)
 
         subdag.create_dagrun.assert_not_called()
         assert 3 == len(subdag_task._get_dagrun.mock_calls)
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index f9e0ebf..dd3bf29 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -421,9 +421,8 @@ class TestBaseSensor(unittest.TestCase):
         # poke returns False and AirflowRescheduleException is raised
         date1 = timezone.utcnow()
         with freeze_time(date1):
-            dates = self.dag.get_run_dates(DEFAULT_DATE, 
end_date=DEFAULT_DATE, align=True)
-            for date in dates:
-                TaskInstance(sensor, date).run(ignore_ti_state=True, 
test_mode=True)
+            for info in self.dag.iter_dagrun_infos_between(DEFAULT_DATE, 
DEFAULT_DATE):
+                TaskInstance(sensor, 
info.logical_date).run(ignore_ti_state=True, test_mode=True)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py
index 37f0114..c4aecd4 100644
--- a/tests/sensors/test_python.py
+++ b/tests/sensors/test_python.py
@@ -76,6 +76,7 @@ class TestPythonSensor(TestPythonBase):
         self.dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
@@ -117,6 +118,7 @@ class TestPythonSensor(TestPythonBase):
         self.dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
             start_date=DEFAULT_DATE,
             state=State.RUNNING,
         )
diff --git a/tests/test_utils/perf/scheduler_dag_execution_timing.py 
b/tests/test_utils/perf/scheduler_dag_execution_timing.py
index 0998a97..2adbd5f 100755
--- a/tests/test_utils/perf/scheduler_dag_execution_timing.py
+++ b/tests/test_utils/perf/scheduler_dag_execution_timing.py
@@ -166,7 +166,7 @@ def create_dag_runs(dag, num_runs, session):
     last_dagrun_at = None
     for _ in range(num_runs):
         next_info = dag.next_dagrun_info(last_dagrun_at)
-        last_dagrun_at = next_info.data_interval.start
+        last_dagrun_at = next_info.logical_date
         dag.create_dagrun(
             run_id=f"{id_prefix}{last_dagrun_at.isoformat()}",
             execution_date=last_dagrun_at,
diff --git a/tests/timetables/test_time_table_iter_ranges.py 
b/tests/timetables/test_time_table_iter_ranges.py
deleted file mode 100644
index c9ee747..0000000
--- a/tests/timetables/test_time_table_iter_ranges.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Tests for Timetable.iter_between()."""
-
-from datetime import datetime, timedelta
-
-import pytest
-
-from airflow.settings import TIMEZONE
-from airflow.timetables.interval import DeltaDataIntervalTimetable
-
-
[email protected]()
-def timetable_1s():
-    return DeltaDataIntervalTimetable(timedelta(seconds=1))
-
-
-def test_end_date_before_start_date(timetable_1s):
-    start = datetime(2016, 2, 1, tzinfo=TIMEZONE)
-    end = datetime(2016, 1, 1, tzinfo=TIMEZONE)
-    message = r"start \([- :+\d]{25}\) > end \([- :+\d]{25}\)"
-    with pytest.raises(ValueError, match=message):
-        list(timetable_1s.iter_between(start, end, align=True))
diff --git a/tests/www/views/test_views_trigger_dag.py 
b/tests/www/views/test_views_trigger_dag.py
index 5540328..2ef3fd5 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import datetime
 import json
 
 import pytest
@@ -100,7 +101,7 @@ def test_trigger_dag_wrong_execution_date(admin_client):
     assert run is None
 
 
-def test_trigger_dag_execution_date(admin_client):
+def test_trigger_dag_execution_date_data_interval(admin_client):
     test_dag_id = "example_bash_operator"
     exec_date = timezone.utcnow()
 
@@ -113,6 +114,12 @@ def test_trigger_dag_execution_date(admin_client):
     assert run.run_type == DagRunType.MANUAL
     assert run.execution_date == exec_date
 
+    # Since example_bash_operator runs once per day, the data interval should 
be
+    # between midnight yesterday and midnight today.
+    today_midnight = exec_date.replace(hour=0, minute=0, second=0, 
microsecond=0)
+    assert run.data_interval_start == (today_midnight - 
datetime.timedelta(days=1))
+    assert run.data_interval_end == today_midnight
+
 
 def test_trigger_dag_form(admin_client):
     test_dag_id = "example_bash_operator"

Reply via email to