This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4556828 AIP-39: DagRun.data_interval_start|end (#16352)
4556828 is described below
commit 4556828dccba2df0ab26b0aeeb26d0a30944da6f
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Aug 12 22:16:57 2021 +0800
AIP-39: DagRun.data_interval_start|end (#16352)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/api/common/experimental/mark_tasks.py | 4 +-
.../api_connexion/endpoints/dag_run_endpoint.py | 19 +-
airflow/jobs/backfill_job.py | 47 +--
airflow/jobs/scheduler_job.py | 5 +-
...ta_interval_start_end_to_dagmodel_and_dagrun.py | 77 +++++
airflow/models/baseoperator.py | 7 +-
airflow/models/dag.py | 168 ++++++++---
airflow/models/dagrun.py | 12 +-
airflow/models/taskinstance.py | 318 ++++++++++++++-------
airflow/operators/python.py | 7 +-
airflow/operators/subdag.py | 8 +-
airflow/timetables/base.py | 56 ++--
airflow/timetables/interval.py | 9 +-
airflow/timetables/simple.py | 8 +-
airflow/utils/file.py | 2 +
docs/apache-airflow/migrations-ref.rst | 4 +-
docs/apache-airflow/templates-ref.rst | 108 ++++---
.../endpoints/test_dag_run_endpoint.py | 5 +-
tests/cli/commands/test_dag_command.py | 28 +-
tests/core/test_core.py | 45 +--
tests/decorators/test_python.py | 2 +
tests/jobs/test_backfill_job.py | 30 +-
tests/jobs/test_scheduler_job.py | 31 +-
tests/models/test_dag.py | 110 ++++---
tests/models/test_taskinstance.py | 15 +-
tests/operators/test_python.py | 2 +
tests/operators/test_subdag_operator.py | 50 +++-
tests/sensors/test_base.py | 5 +-
tests/sensors/test_python.py | 2 +
.../perf/scheduler_dag_execution_timing.py | 2 +-
tests/timetables/test_time_table_iter_ranges.py | 38 ---
tests/www/views/test_views_trigger_dag.py | 9 +-
32 files changed, 848 insertions(+), 385 deletions(-)
diff --git a/airflow/api/common/experimental/mark_tasks.py
b/airflow/api/common/experimental/mark_tasks.py
index 9a99d4d..30ecd42 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -257,7 +257,9 @@ def get_execution_dates(dag, execution_date, future, past):
dag_runs = dag.get_dagruns_between(start_date=start_date,
end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
else:
- dates = dag.get_run_dates(start_date, end_date, align=False)
+ dates = [
+ info.logical_date for info in
dag.iter_dagrun_infos_between(start_date, end_date, align=False)
+ ]
return dates
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 27d13b6..80d04ab 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -31,6 +31,7 @@ from airflow.api_connexion.schemas.dag_run_schema import (
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.session import provide_session
+from airflow.utils.state import State
from airflow.utils.types import DagRunType
@@ -242,21 +243,29 @@ def post_dag_run(dag_id, session):
except ValidationError as err:
raise BadRequest(detail=str(err))
+ execution_date = post_body["execution_date"]
+ run_id = post_body["run_id"]
dagrun_instance = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
- or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date ==
post_body["execution_date"]),
+ or_(DagRun.run_id == run_id, DagRun.execution_date ==
execution_date),
)
.first()
)
if not dagrun_instance:
- dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL,
**post_body)
- session.add(dag_run)
- session.commit()
+ dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id=run_id,
+ execution_date=execution_date,
+ state=State.QUEUED,
+ conf=post_body.get("conf"),
+ external_trigger=True,
+ dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
+ )
return dagrun_schema.dump(dag_run)
- if dagrun_instance.execution_date == post_body["execution_date"]:
+ if dagrun_instance.execution_date == execution_date:
raise AlreadyExists(
detail=f"DAGRun with DAG ID: '{dag_id}' and "
f"DAGRun ExecutionDate: '{post_body['execution_date']}' already
exists"
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 880bdaa..5a8c77b 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -19,9 +19,9 @@
import time
from collections import OrderedDict
-from datetime import datetime
from typing import Optional, Set
+import pendulum
from sqlalchemy import and_
from sqlalchemy.orm.session import Session, make_transient
from tabulate import tabulate
@@ -42,6 +42,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
+from airflow.timetables.base import DagRunInfo
from airflow.utils import helpers, timezone
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.session import provide_session
@@ -283,17 +284,19 @@ class BackfillJob(BaseJob):
ti.handle_failure_with_callback(error=msg)
@provide_session
- def _get_dag_run(self, run_date: datetime, dag: DAG, session: Session =
None):
+ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session
= None):
"""
Returns a dag run for the given run date, which will be matched to an
existing
dag run if available or create a new dag run otherwise. If the
max_active_runs
limit is reached, this function will return None.
- :param run_date: the execution date for the dag run
+ :param dagrun_info: Schedule information for the dag run
:param dag: DAG
:param session: the database session object
:return: a DagRun in state RUNNING or None
"""
+ run_date = dagrun_info.logical_date
+
# consider max_active_runs but ignore when running subdags
respect_dag_max_active_limit = bool(dag.schedule_interval and not
dag.is_subdag)
@@ -317,6 +320,7 @@ class BackfillJob(BaseJob):
run = run or dag.create_dagrun(
execution_date=run_date,
+ data_interval=dagrun_info.data_interval,
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False,
@@ -690,14 +694,14 @@ class BackfillJob(BaseJob):
return err
@provide_session
- def _execute_for_run_dates(self, run_dates, ti_status, executor,
pickle_id, start_date, session=None):
+ def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id,
start_date, session=None):
"""
Computes the dag runs and their respective task instances for
the given run dates and executes the task instances.
Returns a list of execution dates of the dag runs that were executed.
- :param run_dates: Execution dates for dag runs
- :type run_dates: list
+ :param dagrun_infos: Schedule information for dag runs
+ :type dagrun_infos: list[DagRunInfo]
:param ti_status: internal BackfillJob status structure to tis track
progress
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to use, it must be previously started
@@ -709,9 +713,9 @@ class BackfillJob(BaseJob):
:param session: the current session object
:type session: sqlalchemy.orm.session.Session
"""
- for next_run_date in run_dates:
+ for dagrun_info in dagrun_infos:
for dag in [self.dag] + self.dag.subdags:
- dag_run = self._get_dag_run(next_run_date, dag,
session=session)
+ dag_run = self._get_dag_run(dagrun_info, dag, session=session)
tis_map = self._task_instances_for_dag_run(dag_run,
session=session)
if dag_run is None:
continue
@@ -755,8 +759,13 @@ class BackfillJob(BaseJob):
start_date = self.bf_start_date
- # Get intervals between the start/end dates, which will turn into dag
runs
- run_dates = self.dag.get_run_dates(start_date=start_date,
end_date=self.bf_end_date, align=True)
+ # Get DagRun schedule between the start/end dates, which will turn
into dag runs.
+ dagrun_start_date = timezone.coerce_datetime(start_date)
+ if self.bf_end_date is None:
+ dagrun_end_date = pendulum.now(timezone.utc)
+ else:
+ dagrun_end_date = pendulum.instance(self.bf_end_date)
+ dagrun_infos =
list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
if self.run_backwards:
tasks_that_depend_on_past = [t.task_id for t in
self.dag.task_dict.values() if t.depends_on_past]
if tasks_that_depend_on_past:
@@ -765,9 +774,10 @@ class BackfillJob(BaseJob):
",".join(tasks_that_depend_on_past)
)
)
- run_dates = run_dates[::-1]
+ dagrun_infos = dagrun_infos[::-1]
- if len(run_dates) == 0:
+ dagrun_info_count = len(dagrun_infos)
+ if dagrun_info_count == 0:
self.log.info("No run dates were found for the given dates and dag
interval.")
return
@@ -788,17 +798,18 @@ class BackfillJob(BaseJob):
executor.job_id = "backfill"
executor.start()
- ti_status.total_runs = len(run_dates) # total dag runs in backfill
+ ti_status.total_runs = dagrun_info_count # total dag runs in backfill
try:
remaining_dates = ti_status.total_runs
while remaining_dates > 0:
- dates_to_process = [
- run_date for run_date in run_dates if run_date not in
ti_status.executed_dag_run_dates
+ dagrun_infos_to_process = [
+ dagrun_info
+ for dagrun_info in dagrun_infos
+ if dagrun_info.logical_date not in
ti_status.executed_dag_run_dates
]
-
- self._execute_for_run_dates(
- run_dates=dates_to_process,
+ self._execute_dagruns(
+ dagrun_infos=dagrun_infos_to_process,
ti_status=ti_status,
executor=executor,
pickle_id=pickle_id,
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index c254200..e0845c6 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -953,14 +953,13 @@ class SchedulerJob(BaseJob):
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
state=State.QUEUED,
+ data_interval=dag_model.next_dagrun_data_interval,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.id,
)
- dag_model.next_dagrun, dag_model.next_dagrun_create_after =
dag.next_dagrun_info(
- dag_model.next_dagrun
- )
+ dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun,
0)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
# memory for larger dags? or expunge_all()
diff --git
a/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
b/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
new file mode 100644
index 0000000..369e6a3
--- /dev/null
+++
b/airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagmodel_and_dagrun.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagModel and DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+from alembic import op
+from sqlalchemy import TIMESTAMP, Column
+from sqlalchemy.dialects import mssql, mysql
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "54bebd308c5f"
+branch_labels = None
+depends_on = None
+
+
+def _use_date_time2(conn):
+ result = conn.execute(
+ """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY
('productversion'))
+ like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion"""
+ ).fetchone()
+ mssql_version = result[0]
+ return mssql_version not in ("2000", "2005")
+
+
+def _get_timestamp(conn):
+ dialect_name = conn.dialect.name
+ if dialect_name == "mysql":
+ return mysql.TIMESTAMP(fsp=6, timezone=True)
+ if dialect_name != "mssql":
+ return TIMESTAMP(timezone=True)
+ if _use_date_time2(conn):
+ return mssql.DATETIME2(precision=6)
+ return mssql.DATETIME
+
+
+def upgrade():
+ """Apply data_interval fields to DagModel and DagRun."""
+ column_type = _get_timestamp(op.get_bind())
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.add_column(Column("data_interval_start", column_type))
+ batch_op.add_column(Column("data_interval_end", column_type))
+ with op.batch_alter_table("dag") as batch_op:
+ batch_op.add_column(Column("next_dagrun_data_interval_start",
column_type))
+ batch_op.add_column(Column("next_dagrun_data_interval_end",
column_type))
+
+
+def downgrade():
+ """Unapply data_interval fields to DagModel and DagRun."""
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.drop_column("data_interval_start")
+ batch_op.drop_column("data_interval_end")
+ with op.batch_alter_table("dag") as batch_op:
+ batch_op.drop_column("next_dagrun_data_interval_start")
+ batch_op.drop_column("next_dagrun_data_interval_end")
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index f83a3dc..e1cd6ae 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1259,10 +1259,11 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
start_date = start_date or self.start_date
end_date = end_date or self.end_date or timezone.utcnow()
- for execution_date in self.dag.get_run_dates(start_date, end_date,
align=False):
- TaskInstance(self, execution_date).run(
+ for info in self.dag.iter_dagrun_infos_between(start_date, end_date,
align=False):
+ ignore_depends_on_past = info.logical_date == start_date and
ignore_first_depends_on_past
+ TaskInstance(self, info.logical_date).run(
mark_success=mark_success,
- ignore_depends_on_past=(execution_date == start_date and
ignore_first_depends_on_past),
+ ignore_depends_on_past=ignore_depends_on_past,
ignore_ti_state=ignore_ti_state,
)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 82c4d58..fdfec18 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -70,7 +70,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import Context, TaskInstance,
TaskInstanceKey, clear_task_instances
from airflow.security import permissions
from airflow.stats import Stats
-from airflow.timetables.base import TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable,
DeltaDataIntervalTimetable
from airflow.timetables.schedules import Schedule
from airflow.timetables.simple import NullTimetable, OnceTimetable
@@ -479,8 +479,11 @@ class DAG(LoggingMixin):
if num is not None:
result = utils_date_range(start_date=start_date, num=num)
else:
- message += " Please use `DAG.get_run_dates(..., align=False)`
instead."
- result = self.get_run_dates(start_date, end_date, align=False)
+ message += " Please use `DAG.iter_dagrun_infos_between(...,
align=False)` instead."
+ result = [
+ info.logical_date
+ for info in self.iter_dagrun_infos_between(start_date,
end_date, align=False)
+ ]
warnings.warn(message, category=DeprecationWarning, stacklevel=2)
return result
@@ -524,7 +527,7 @@ class DAG(LoggingMixin):
def next_dagrun_info(
self,
date_last_automated_dagrun: Optional[pendulum.DateTime],
- ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+ ) -> Optional[DagRunInfo]:
"""Get information about the next DagRun of this dag after
``date_last_automated_dagrun``.
This calculates what time interval the next DagRun should operate on
@@ -537,22 +540,20 @@ class DAG(LoggingMixin):
:param date_last_automated_dagrun: The ``max(execution_date)`` of
existing "automated" DagRuns for this dag (scheduled or backfill,
but not manual).
- :return: A 2-tuple containing the DagRun's execution date, and the
- earliest it could be scheduled.
+ :return: DagRunInfo of the next dagrun, or None if a dagrun is not
+ going to be scheduled.
"""
- # XXX: The timezone.coerce_datetime calls in this function should not
- # be necessary since the function annotation suggests it only accepts
- # pendulum.DateTime, and someone is passing datetime.datetime into this
- # function. We should fix whatever is doing that.
+ # Never schedule a subdag. It will be scheduled by its parent dag.
if self.is_subdag:
- return (None, None)
- next_info = self.timetable.next_dagrun_info(
+ return None
+ # XXX: The timezone.coerce_datetime calls should not be necessary since
+ # the function annotation suggests it only accepts pendulum.DateTime,
+ # and someone is passing datetime.datetime into this function. We
should
+ # fix whatever is doing that.
+ return self.timetable.next_dagrun_info(
timezone.coerce_datetime(date_last_automated_dagrun),
self._time_restriction,
)
- if next_info is None:
- return (None, None)
- return (next_info.data_interval.start, next_info.run_after)
def next_dagrun_after_date(self, date_last_automated_dagrun:
Optional[pendulum.DateTime]):
warnings.warn(
@@ -560,7 +561,10 @@ class DAG(LoggingMixin):
category=DeprecationWarning,
stacklevel=2,
)
- return self.next_dagrun_info(date_last_automated_dagrun)[0]
+ info = self.next_dagrun_info(date_last_automated_dagrun)
+ if info is None:
+ return None
+ return info.run_after
@cached_property
def _time_restriction(self) -> TimeRestriction:
@@ -594,7 +598,64 @@ class DAG(LoggingMixin):
type_name = type(interval).__name__
raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
- def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+ def iter_dagrun_infos_between(
+ self,
+ earliest: Optional[pendulum.DateTime],
+ latest: pendulum.DateTime,
+ *,
+ align: bool = True,
+ ) -> Iterable[DagRunInfo]:
+ """Yield DagRunInfo using this DAG's timetable between given interval.
+
+ DagRunInfo instances yielded if their ``logical_date`` is not earlier
+ than ``earliest``, nor later than ``latest``. The instances are ordered
+ by their ``logical_date`` from earliest to latest.
+
+ If ``align`` is ``False``, the first run will happen immediately on
+ ``earliest``, even if it does not fall on the logical timetable
schedule.
+ The default is ``True``, but subdags will ignore this value and always
+ behave as if this is set to ``False`` for backward compatibility.
+
+ Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
+ ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
+ ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
+ if ``align=True``.
+ """
+ if earliest is None:
+ earliest = self._time_restriction.earliest
+ earliest = timezone.coerce_datetime(earliest)
+ latest = timezone.coerce_datetime(latest)
+
+ restriction = TimeRestriction(earliest, latest, catchup=True)
+
+ # HACK: Sub-DAGs are currently scheduled differently. For example, say
+ # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
+ # DAG should be first scheduled to run on midnight 2021-06-04, but a
+ # sub-DAG should be first scheduled to run RIGHT NOW. We can change
+ # this, but since sub-DAGs are going away in 3.0 anyway, let's keep
+ # compatibility for now and remove this entirely later.
+ if self.is_subdag:
+ align = False
+
+ info = self.timetable.next_dagrun_info(None, restriction)
+ if info is None:
+ # No runs to be scheduled between the user-supplied timeframe. But
+ # if align=False, "invent" a data interval for the timeframe
itself.
+ if not align:
+ yield DagRunInfo.interval(earliest, latest)
+ return
+
+ # If align=False and earliest does not fall on the timetable's logical
+ # schedule, "invent" a data interval for it.
+ if not align and info.logical_date != earliest:
+ yield DagRunInfo.interval(earliest, info.data_interval.start)
+
+ # Generate naturally according to schedule.
+ while info is not None:
+ yield info
+ info = self.timetable.next_dagrun_info(info.logical_date,
restriction)
+
+ def get_run_dates(self, start_date, end_date=None):
"""
Returns a list of dates between the interval received as parameter
using this
dag's schedule interval. Returned dates can be used for execution
dates.
@@ -603,31 +664,20 @@ class DAG(LoggingMixin):
:type start_date: datetime
:param end_date: The end date of the interval. Defaults to
``timezone.utcnow()``.
:type end_date: datetime
- :param align: Whether the first run should be delayed to "align" with
- the schedule, or can happen immediately at start_date. The default
is
- ``True``, but subdags will ignore this value and always behave as
if
- this is set to ``False`` for backward compatibility.
- :type align: bool
:return: A list of dates within the interval following the dag's
schedule.
:rtype: list
"""
- if start_date is None:
- start = self._time_restriction.earliest
- else:
- start = pendulum.instance(start_date)
+ warnings.warn(
+ "`DAG.get_run_dates()` is deprecated. Please use
`DAG.iter_dagrun_infos_between()` instead.",
+ category=DeprecationWarning,
+ stacklevel=2,
+ )
+ earliest = timezone.coerce_datetime(start_date)
if end_date is None:
- end = pendulum.now(timezone.utc)
+ latest = pendulum.now(timezone.utc)
else:
- end = pendulum.instance(end_date)
- # HACK: Sub-DAGs are currently scheduled differently. For example, say
- # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
- # DAG should be first scheduled to run on midnight 2021-06-04, but a
- # sub-DAG should be first scheduled to run RIGHT NOW. We can change
- # this, but since the sub-DAG is going away in 3.0 anyway, let's keep
- # compatibility for now and remove this entirely later.
- if self.is_subdag:
- align = False
- return sorted(self.timetable.iter_between(start, end, align=align))
+ latest = pendulum.instance(end_date)
+ return [info.logical_date for info in
self.iter_dagrun_infos_between(earliest, latest)]
def normalize_schedule(self, dttm):
warnings.warn(
@@ -2018,6 +2068,7 @@ class DAG(LoggingMixin):
session=None,
dag_hash: Optional[str] = None,
creating_job_id: Optional[int] = None,
+ data_interval: Optional[Tuple[datetime, datetime]] = None,
):
"""
Creates a dag run from this dag including the tasks associated with
this dag.
@@ -2043,6 +2094,8 @@ class DAG(LoggingMixin):
:type session: sqlalchemy.orm.session.Session
:param dag_hash: Hash of Serialized DAG
:type dag_hash: str
+ :param data_interval: Data interval of the DagRun
+ :type data_interval: tuple[datetime, datetime] | None
"""
if run_id and not run_type:
if not isinstance(run_id, str):
@@ -2057,6 +2110,9 @@ class DAG(LoggingMixin):
"Creating DagRun needs either `run_id` or both `run_type` and
`execution_date`"
)
+ if run_type == DagRunType.MANUAL and data_interval is None and
execution_date is not None:
+ data_interval =
self.timetable.infer_data_interval(timezone.coerce_datetime(execution_date))
+
run = DagRun(
dag_id=self.dag_id,
run_id=run_id,
@@ -2068,6 +2124,7 @@ class DAG(LoggingMixin):
run_type=run_type,
dag_hash=dag_hash,
creating_job_id=creating_job_id,
+ data_interval=data_interval,
)
session.add(run)
session.flush()
@@ -2410,9 +2467,14 @@ class DagModel(Base):
has_task_concurrency_limits = Column(Boolean, nullable=False)
- # The execution_date of the next dag run
+ # The logical date of the next dag run.
next_dagrun = Column(UtcDateTime)
- # Earliest time at which this ``next_dagrun`` can be created
+
+ # Must be either both NULL or both datetime.
+ next_dagrun_data_interval_start = Column(UtcDateTime)
+ next_dagrun_data_interval_end = Column(UtcDateTime)
+
+ # Earliest time at which this ``next_dagrun`` can be created.
next_dagrun_create_after = Column(UtcDateTime)
__table_args__ = (
@@ -2446,6 +2508,26 @@ class DagModel(Base):
return f"<DAG: {self.dag_id}>"
@property
+ def next_dagrun_data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+ if self.next_dagrun_data_interval_start is None:
+ if self.next_dagrun_data_interval_end is not None:
+ raise AirflowException(
+ f"Inconsistent DagModel: "
+
f"next_dagrun_data_interval_start={self.next_dagrun_data_interval_start!r}, "
+
f"next_dagrun_data_interval_end={self.next_dagrun_data_interval_end!r}; "
+ f"they must be either both None or both datetime"
+ )
+ return None
+ return (self.next_dagrun_data_interval_start,
self.next_dagrun_data_interval_end)
+
+ @next_dagrun_data_interval.setter
+ def next_dagrun_data_interval(self, value: Optional[Tuple[datetime,
datetime]]) -> None:
+ if value is None:
+ self.next_dagrun_data_interval_start =
self.next_dagrun_data_interval_end = None
+ else:
+ self.next_dagrun_data_interval_start,
self.next_dagrun_data_interval_end = value
+
+ @property
def timezone(self):
return settings.TIMEZONE
@@ -2588,7 +2670,13 @@ class DagModel(Base):
:param most_recent_dag_run: DateTime of most recent run of this dag,
or none if not yet scheduled.
:param active_runs_of_dag: Number of currently active runs of this dag
"""
- self.next_dagrun, self.next_dagrun_create_after =
dag.next_dagrun_info(most_recent_dag_run)
+ next_dagrun_info = dag.next_dagrun_info(most_recent_dag_run)
+ if next_dagrun_info is None:
+ self.next_dagrun_data_interval = self.next_dagrun =
self.next_dagrun_create_after = None
+ else:
+ self.next_dagrun_data_interval = next_dagrun_info.data_interval
+ self.next_dagrun = next_dagrun_info.logical_date
+ self.next_dagrun_create_after = next_dagrun_info.run_after
if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
# Since this happens every time the dag is parsed it would be
quite spammy at info
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 508312b..098fd5b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -76,6 +76,9 @@ class DagRun(Base, LoggingMixin):
external_trigger = Column(Boolean, default=True)
run_type = Column(String(50), nullable=False)
conf = Column(PickleType)
+ # These two must be either both NULL or both datetime.
+ data_interval_start = Column(UtcDateTime)
+ data_interval_end = Column(UtcDateTime)
# When a scheduler last attempted to schedule TIs for this DagRun
last_scheduling_decision = Column(UtcDateTime)
dag_hash = Column(String(32))
@@ -91,7 +94,7 @@ class DagRun(Base, LoggingMixin):
task_instances = relationship(
TI,
- primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date ==
execution_date), # type: ignore
+ primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date ==
execution_date),
foreign_keys=(dag_id, execution_date),
backref=backref('dag_run', uselist=False),
)
@@ -115,7 +118,14 @@ class DagRun(Base, LoggingMixin):
run_type: Optional[str] = None,
dag_hash: Optional[str] = None,
creating_job_id: Optional[int] = None,
+ data_interval: Optional[Tuple[datetime, datetime]] = None,
):
+ if data_interval is None:
+ # Legacy: Only happen for runs created prior to Airflow 2.2.
+ self.data_interval_start = self.data_interval_end = None
+ else:
+ self.data_interval_start, self.data_interval_end = data_interval
+
self.dag_id = dag_id
self.run_id = run_id
self.execution_date = execution_date
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6c2ca8d..6f145f9 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -23,7 +23,7 @@ import os
import pickle
import signal
import warnings
-from collections import defaultdict
+from collections import defaultdict, namedtuple
from datetime import datetime, timedelta
from functools import partial
from tempfile import NamedTemporaryFile
@@ -54,6 +54,7 @@ from sqlalchemy.sql.expression import tuple_
from sqlalchemy.sql.sqltypes import BigInteger
from airflow import settings
+from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
@@ -818,51 +819,69 @@ class TaskInstance(Base, LoggingMixin):
return count == len(task.downstream_task_ids)
@provide_session
- def get_previous_ti(
- self, state: Optional[str] = None, session: Session = None
- ) -> Optional['TaskInstance']:
- """
- The task instance for the task that ran before this task instance.
+ def get_previous_dagrun(
+ self,
+ state: Optional[str] = None,
+ session: Optional[Session] = None,
+ ) -> Optional["DagRun"]:
+ """The DagRun that ran before this task instance's DagRun.
:param state: If passed, it only take into account instances of a
specific state.
- :param session: SQLAlchemy ORM Session
+ :param session: SQLAlchemy ORM Session.
"""
dag = self.task.dag
- if dag:
- dr = self.get_dagrun(session=session)
+ if dag is None:
+ return None
- # LEGACY: most likely running from unit tests
- if not dr:
- # Means that this TaskInstance is NOT being run from a DR, but
from a catchup
- try:
- # XXX: This uses DAG internals, but as the outer comment
- # said, the block is only reached for legacy reasons for
- # development code, so that's OK-ish.
- schedule = dag.timetable._schedule
- except AttributeError:
- return None
- dt = pendulum.instance(self.execution_date)
- return TaskInstance(
- task=self.task,
- execution_date=schedule.get_prev(dt),
- )
+ dr = self.get_dagrun(session=session)
+
+ # LEGACY: most likely running from unit tests
+ if not dr:
+ # Means that this TaskInstance is NOT being run from a DR, but
from a catchup
+ try:
+ # XXX: This uses DAG internals, but as the outer comment
+ # said, the block is only reached for legacy reasons for
+ # development code, so that's OK-ish.
+ schedule = dag.timetable._schedule
+ except AttributeError:
+ return None
+ dt = pendulum.instance(self.execution_date)
+ return TaskInstance(
+ task=self.task,
+ execution_date=schedule.get_prev(dt),
+ )
- dr.dag = dag
+ dr.dag = dag
- # We always ignore schedule in dagrun lookup when `state` is given
or `schedule_interval is None`.
- # For legacy reasons, when `catchup=True`, we use
`get_previous_scheduled_dagrun` unless
- # `ignore_schedule` is `True`.
- ignore_schedule = state is not None or dag.schedule_interval is
None
- if dag.catchup is True and not ignore_schedule:
- last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
- else:
- last_dagrun = dr.get_previous_dagrun(session=session,
state=state)
+ # We always ignore schedule in dagrun lookup when `state` is given or
`schedule_interval is None`.
+ # For legacy reasons, when `catchup=True`, we use
`get_previous_scheduled_dagrun` unless
+ # `ignore_schedule` is `True`.
+ ignore_schedule = state is not None or dag.schedule_interval is None
+ if dag.catchup is True and not ignore_schedule:
+ last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
+ else:
+ last_dagrun = dr.get_previous_dagrun(session=session, state=state)
- if last_dagrun:
- return last_dagrun.get_task_instance(self.task_id,
session=session)
+ if last_dagrun:
+ return last_dagrun
return None
+ @provide_session
+ def get_previous_ti(
+ self, state: Optional[str] = None, session: Session = None
+ ) -> Optional['TaskInstance']:
+ """
+ The task instance for the task that ran before this task instance.
+
+ :param state: If passed, it only take into account instances of a
specific state.
+ :param session: SQLAlchemy ORM Session
+ """
+ dagrun = self.get_previous_dagrun(state, session=session)
+ if dagrun is None:
+ return None
+ return dagrun.get_task_instance(self.task_id, session=session)
+
@property
def previous_ti(self):
"""
@@ -1685,66 +1704,63 @@ class TaskInstance(Base, LoggingMixin):
integrate_macros_plugins()
- params = {} # type: Dict[str, Any]
- run_id = ''
- dag_run = None
- if hasattr(task, 'dag'):
- if task.dag.params:
- params.update(task.dag.params)
- from airflow.models.dagrun import DagRun # Avoid circular import
-
- dag_run = (
- session.query(DagRun)
- .filter_by(dag_id=task.dag.dag_id,
execution_date=self.execution_date)
- .first()
+ dag_run = self.get_dagrun()
+
+ # FIXME: Many tests don't create a DagRun. We should fix the tests.
+ if dag_run is None:
+ FakeDagRun = namedtuple(
+ "FakeDagRun",
+ # A minimal set of attributes to keep things working.
+ "conf data_interval_start data_interval_end external_trigger
run_id",
+ )
+ dag_run = FakeDagRun(
+ conf=None,
+ data_interval_start=None,
+ data_interval_end=None,
+ external_trigger=False,
+ run_id="",
)
- run_id = dag_run.run_id if dag_run else None
- session.expunge_all()
- session.commit()
- ds = self.execution_date.strftime('%Y-%m-%d')
- ts = self.execution_date.isoformat()
- yesterday_ds = (self.execution_date -
timedelta(1)).strftime('%Y-%m-%d')
- tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
-
- # For manually triggered dagruns that aren't run on a schedule,
next/previous
- # schedule dates don't make sense, and should be set to execution date
for
- # consistency with how execution_date is set for manually triggered
tasks, i.e.
- # triggered_date == execution_date.
- if dag_run and dag_run.external_trigger:
- prev_execution_date = self.execution_date
- next_execution_date = self.execution_date
- else:
- prev_execution_date =
task.dag.previous_schedule(self.execution_date)
- next_execution_date =
task.dag.following_schedule(self.execution_date)
-
- next_ds = None
- next_ds_nodash = None
- if next_execution_date:
- next_ds = next_execution_date.strftime('%Y-%m-%d')
- next_ds_nodash = next_ds.replace('-', '')
- next_execution_date = pendulum.instance(next_execution_date)
-
- prev_ds = None
- prev_ds_nodash = None
- if prev_execution_date:
- prev_ds = prev_execution_date.strftime('%Y-%m-%d')
- prev_ds_nodash = prev_ds.replace('-', '')
- prev_execution_date = pendulum.instance(prev_execution_date)
+ params = {} # type: Dict[str, Any]
+ with contextlib.suppress(AttributeError):
+ params.update(task.dag.params)
+ if task.params:
+ params.update(task.params)
+ if conf.getboolean('core', 'dag_run_conf_overrides_params'):
+ self.overwrite_params_with_dag_run_conf(params=params,
dag_run=dag_run)
+ # DagRuns scheduled prior to Airflow 2.2 and by tests don't always have
+ # a data interval, and we default to execution_date for compatibility.
+ compat_interval_start =
timezone.coerce_datetime(dag_run.data_interval_start or self.execution_date)
+ ds = compat_interval_start.strftime('%Y-%m-%d')
ds_nodash = ds.replace('-', '')
- ts_nodash = self.execution_date.strftime('%Y%m%dT%H%M%S')
+ ts = compat_interval_start.isoformat()
+ ts_nodash = compat_interval_start.strftime('%Y%m%dT%H%M%S')
ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
- yesterday_ds_nodash = yesterday_ds.replace('-', '')
- tomorrow_ds_nodash = tomorrow_ds.replace('-', '')
- ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}"
+ @cache # Prevent multiple database access.
+ def _get_previous_dagrun_success() -> Optional["DagRun"]:
+ return self.get_previous_dagrun(state=State.SUCCESS,
session=session)
- if task.params:
- params.update(task.params)
+ def get_prev_data_interval_start_success() ->
Optional[pendulum.DateTime]:
+ dagrun = _get_previous_dagrun_success()
+ if dagrun is None:
+ return None
+ return timezone.coerce_datetime(dagrun.data_interval_start)
- if conf.getboolean('core', 'dag_run_conf_overrides_params'):
- self.overwrite_params_with_dag_run_conf(params=params,
dag_run=dag_run)
+ def get_prev_data_interval_end_success() ->
Optional[pendulum.DateTime]:
+ dagrun = _get_previous_dagrun_success()
+ if dagrun is None:
+ return None
+ return timezone.coerce_datetime(dagrun.data_interval_end)
+
+ def get_prev_start_date_success() -> Optional[pendulum.DateTime]:
+ dagrun = _get_previous_dagrun_success()
+ if dagrun is None:
+ return None
+ return timezone.coerce_datetime(dagrun.start_date)
+
+ # Custom accessors.
class VariableAccessor:
"""
@@ -1826,37 +1842,129 @@ class TaskInstance(Base, LoggingMixin):
except AirflowNotFoundException:
return default_conn
+ # Create lazy proxies for deprecated stuff.
+
+ def deprecated_proxy(func, *, key, replacement=None) ->
lazy_object_proxy.Proxy:
+ def deprecated_func():
+ message = (
+ f"Accessing {key!r} from the template is deprecated and "
+ f"will be removed in a future version."
+ )
+ if replacement:
+ message += f" Please use {replacement!r} instead."
+ warnings.warn(message, DeprecationWarning)
+ return func()
+
+ return lazy_object_proxy.Proxy(deprecated_func)
+
+ @cache
+ def get_yesterday_ds() -> str:
+ return (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
+
+ def get_yesterday_ds_nodash() -> str:
+ return get_yesterday_ds().replace('-', '')
+
+ @cache
+ def get_tomorrow_ds() -> str:
+ return (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
+
+ def get_tomorrow_ds_nodash() -> str:
+ return get_tomorrow_ds().replace('-', '')
+
+ @cache
+ def get_next_execution_date() -> Optional[pendulum.DateTime]:
+ # For manually triggered dagruns that aren't run on a schedule,
+ # next/previous execution dates don't make sense, and should be set
+ # to execution date for consistency with how execution_date is set
+ # for manually triggered tasks, i.e. triggered_date ==
execution_date.
+ if dag_run.external_trigger:
+ next_execution_date = self.execution_date
+ else:
+ next_execution_date =
task.dag.following_schedule(self.execution_date)
+ if next_execution_date is None:
+ return None
+ return timezone.coerce_datetime(next_execution_date)
+
+ def get_next_ds() -> Optional[str]:
+ execution_date = get_next_execution_date()
+ if execution_date is None:
+ return None
+ return execution_date.strftime('%Y-%m-%d')
+
+ def get_next_ds_nodash() -> Optional[str]:
+ ds = get_next_ds()
+ if ds is None:
+ return ds
+ return ds.replace('-', '')
+
+ @cache
+ def get_prev_execution_date():
+ if dag_run.external_trigger:
+ return timezone.coerce_datetime(self.execution_date)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ return task.dag.previous_schedule(self.execution_date)
+
+ @cache
+ def get_prev_ds() -> Optional[str]:
+ execution_date = get_prev_execution_date()
+ if execution_date is None:
+ return None
+ return execution_date.strftime(r'%Y-%m-%d')
+
+ def get_prev_ds_nodash() -> Optional[str]:
+ prev_ds = get_prev_ds()
+ if prev_ds is None:
+ return None
+ return prev_ds.replace('-', '')
+
return {
'conf': conf,
'dag': task.dag,
'dag_run': dag_run,
+ 'data_interval_end':
timezone.coerce_datetime(dag_run.data_interval_end),
+ 'data_interval_start':
timezone.coerce_datetime(dag_run.data_interval_start),
'ds': ds,
'ds_nodash': ds_nodash,
- 'execution_date': pendulum.instance(self.execution_date),
+ 'execution_date': deprecated_proxy(
+ lambda: timezone.coerce_datetime(self.execution_date),
+ key='execution_date',
+ replacement='data_interval_start',
+ ),
'inlets': task.inlets,
'macros': macros,
- 'next_ds': next_ds,
- 'next_ds_nodash': next_ds_nodash,
- 'next_execution_date': next_execution_date,
+ 'next_ds': deprecated_proxy(get_next_ds, key="next_ds",
replacement="data_interval_end | ds"),
+ 'next_ds_nodash': deprecated_proxy(
+ get_next_ds_nodash,
+ key="next_ds_nodash",
+ replacement="data_interval_end | ds_nodash",
+ ),
+ 'next_execution_date': deprecated_proxy(
+ get_next_execution_date,
+ key='next_execution_date',
+ replacement='data_interval_end',
+ ),
'outlets': task.outlets,
'params': params,
- 'prev_ds': prev_ds,
- 'prev_ds_nodash': prev_ds_nodash,
- 'prev_execution_date': prev_execution_date,
- 'prev_execution_date_success': lazy_object_proxy.Proxy(
- lambda: self.get_previous_execution_date(state=State.SUCCESS)
- ),
- 'prev_start_date_success': lazy_object_proxy.Proxy(
- lambda: self.get_previous_start_date(state=State.SUCCESS)
+ 'prev_data_interval_start_success':
lazy_object_proxy.Proxy(get_prev_data_interval_start_success),
+ 'prev_data_interval_end_success':
lazy_object_proxy.Proxy(get_prev_data_interval_end_success),
+ 'prev_ds': deprecated_proxy(get_prev_ds, key="prev_ds"),
+ 'prev_ds_nodash': deprecated_proxy(get_prev_ds_nodash,
key="prev_ds_nodash"),
+ 'prev_execution_date': deprecated_proxy(get_prev_execution_date,
key='prev_execution_date'),
+ 'prev_execution_date_success': deprecated_proxy(
+ lambda: self.get_previous_execution_date(state=State.SUCCESS,
session=session),
+ key='prev_execution_date_success',
+ replacement='prev_data_interval_start_success',
),
- 'run_id': run_id,
+ 'prev_start_date_success':
lazy_object_proxy.Proxy(get_prev_start_date_success),
+ 'run_id': dag_run.run_id,
'task': task,
'task_instance': self,
- 'task_instance_key_str': ti_key_str,
+ 'task_instance_key_str':
f"{task.dag_id}__{task.task_id}__{ds_nodash}",
'test_mode': self.test_mode,
'ti': self,
- 'tomorrow_ds': tomorrow_ds,
- 'tomorrow_ds_nodash': tomorrow_ds_nodash,
+ 'tomorrow_ds': deprecated_proxy(get_tomorrow_ds,
key='tomorrow_ds'),
+ 'tomorrow_ds_nodash': deprecated_proxy(get_tomorrow_ds_nodash,
key='tomorrow_ds_nodash'),
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
@@ -1865,8 +1973,8 @@ class TaskInstance(Base, LoggingMixin):
'value': VariableAccessor(),
},
'conn': ConnectionAccessor(),
- 'yesterday_ds': yesterday_ds,
- 'yesterday_ds_nodash': yesterday_ds_nodash,
+ 'yesterday_ds': deprecated_proxy(get_yesterday_ds,
key='yesterday_ds'),
+ 'yesterday_ds_nodash': deprecated_proxy(get_yesterday_ds_nodash,
key='yesterday_ds_nodash'),
}
def get_rendered_template_fields(self):
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 511cb99..c0f6393 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -341,8 +341,11 @@ class PythonVirtualenvOperator(PythonOperator):
self.python_version = python_version
self.use_dill = use_dill
self.system_site_packages = system_site_packages
- if not self.system_site_packages and self.use_dill and 'dill' not in
self.requirements:
- self.requirements.append('dill')
+ if not self.system_site_packages:
+ if 'lazy-object-proxy' not in self.requirements:
+ self.requirements.append('lazy-object-proxy')
+ if self.use_dill and 'dill' not in self.requirements:
+ self.requirements.append('dill')
self.pickling_library = dill if self.use_dill else pickle
def execute(self, context: Dict):
diff --git a/airflow/operators/subdag.py b/airflow/operators/subdag.py
index 66ac6fb..62b9b77 100644
--- a/airflow/operators/subdag.py
+++ b/airflow/operators/subdag.py
@@ -21,8 +21,9 @@ The module which provides a way to nest your DAGs and so your
levels of complexi
"""
import warnings
+from datetime import datetime
from enum import Enum
-from typing import Dict, Optional
+from typing import Dict, Optional, Tuple
from sqlalchemy.orm.session import Session
@@ -159,12 +160,17 @@ class SubDagOperator(BaseSensorOperator):
dag_run = self._get_dagrun(execution_date)
if dag_run is None:
+ if context['data_interval_start'] is None or
context['data_interval_end'] is None:
+ data_interval: Optional[Tuple[datetime, datetime]] = None
+ else:
+ data_interval = (context['data_interval_start'],
context['data_interval_end'])
dag_run = self.subdag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=execution_date,
state=State.RUNNING,
conf=self.conf,
external_trigger=True,
+ data_interval=data_interval,
)
self.log.info("Created DagRun: %s", dag_run.run_id)
else:
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index 5faf9db..7027851 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Iterator, NamedTuple, Optional
+from typing import NamedTuple, Optional
from pendulum import DateTime
@@ -23,14 +23,16 @@ from airflow.typing_compat import Protocol
class DataInterval(NamedTuple):
- """A data interval for a DagRun to operate over.
-
- The represented interval is ``[start, end)``.
- """
+ """A data interval for a DagRun to operate over."""
start: DateTime
end: DateTime
+ @classmethod
+ def exact(cls, at: DateTime) -> "DagRunInfo":
+ """Represent an "interval" containing only an exact time."""
+ return cls(start=at, end=at)
+
class TimeRestriction(NamedTuple):
"""Restriction on when a DAG can be scheduled for a run.
@@ -62,12 +64,12 @@ class DagRunInfo(NamedTuple):
"""The earliest time this DagRun is created and its tasks scheduled."""
data_interval: DataInterval
- """The data interval this DagRun to operate over, if applicable."""
+ """The data interval this DagRun to operate over."""
@classmethod
def exact(cls, at: DateTime) -> "DagRunInfo":
"""Represent a run on an exact time."""
- return cls(run_after=at, data_interval=DataInterval(at, at))
+ return cls(run_after=at, data_interval=DataInterval.exact(at))
@classmethod
def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
@@ -79,6 +81,15 @@ class DagRunInfo(NamedTuple):
"""
return cls(run_after=end, data_interval=DataInterval(start, end))
+ @property
+ def logical_date(self) -> DateTime:
+ """Infer the logical date to represent a DagRun.
+
+ This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
+ essentially the same, just a different name.
+ """
+ return self.data_interval.start
+
class Timetable(Protocol):
"""Protocol that all Timetable classes are expected to implement."""
@@ -90,6 +101,14 @@ class Timetable(Protocol):
"""
raise NotImplementedError()
+ def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+ """When a DAG run is manually triggered, infer a data interval for it.
+
+ This is used for e.g. manually-triggered runs, where ``run_after``
would
+ be when the user triggers the run.
+ """
+ raise NotImplementedError()
+
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
@@ -108,26 +127,3 @@ class Timetable(Protocol):
a DagRunInfo object when asked at another time.
"""
raise NotImplementedError()
-
- def iter_between(
- self,
- start: DateTime,
- end: DateTime,
- *,
- align: bool,
- ) -> Iterator[DateTime]:
- """Get schedules between the *start* and *end*."""
- if start > end:
- raise ValueError(f"start ({start}) > end ({end})")
- between = TimeRestriction(start, end, catchup=True)
-
- if align:
- next_info = self.next_dagrun_info(None, between)
- else:
- yield start
- next_info = self.next_dagrun_info(start, between)
-
- while next_info is not None:
- dagrun_start = next_info.data_interval.start
- yield dagrun_start
- next_info = self.next_dagrun_info(dagrun_start, between)
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 6b3a46e..9fe4de2 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -20,7 +20,7 @@ from typing import Any, Optional
from pendulum import DateTime
-from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule,
Schedule
@@ -43,6 +43,13 @@ class _DataIntervalTimetable(Timetable):
def validate(self) -> None:
self._schedule.validate()
+ def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+ # Get the last complete period before run_after, e.g. if a DAG run is
+ # scheduled at each midnight, the data interval of a manually triggered
+ # run at 1am 25th is between 0am 24th and 0am 25th.
+ end = self._schedule.get_prev(self._schedule.align(run_after))
+ return DataInterval(start=self._schedule.get_prev(end), end=end)
+
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index b6208a6..f9c062e 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -19,7 +19,7 @@ from typing import Any, Optional
from pendulum import DateTime
-from airflow.timetables.base import DagRunInfo, TimeRestriction, Timetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
class NullTimetable(Timetable):
@@ -37,6 +37,9 @@ class NullTimetable(Timetable):
def validate(self) -> None:
pass
+ def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+ return DataInterval.exact(run_after)
+
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
@@ -60,6 +63,9 @@ class OnceTimetable(Timetable):
def validate(self) -> None:
pass
+ def infer_data_interval(self, run_after: DateTime) -> DataInterval:
+ return DataInterval.exact(run_after)
+
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index c022079..f1c5b4d 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -73,6 +73,8 @@ def correct_maybe_zipped(fileloc):
If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive and path to zip is returned.
"""
+ if not fileloc:
+ return fileloc
_, archive, _ = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return archive
diff --git a/docs/apache-airflow/migrations-ref.rst
b/docs/apache-airflow/migrations-ref.rst
index 6d58055..0803557 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version |
Description
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``54bebd308c5f`` (head) | ``97cdd93827b8`` | | Adds
``trigger`` table and deferrable operator columns to task instance
|
+| ``142555e44c17`` (head) | ``54bebd308c5f`` | | Add
``data_interval_start`` and ``data_interval_end`` to ``DagRun``
|
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``54bebd308c5f`` | ``97cdd93827b8`` | | Adds
``trigger`` table and deferrable operator columns to task instance
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``97cdd93827b8`` | ``30867afad44a`` | | Add
``queued_at`` column in ``dag_run`` table
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/docs/apache-airflow/templates-ref.rst
b/docs/apache-airflow/templates-ref.rst
index 8616b68..0b3dd7c 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -23,7 +23,8 @@ Templates reference
Variables, macros and filters can be used in templates (see the
:ref:`concepts:jinja-templating` section)
The following come for free out of the box with Airflow.
-Additional custom macros can be added globally through :doc:`plugins`, or at a
DAG level through the ``DAG.user_defined_macros`` argument.
+Additional custom macros can be added globally through :doc:`plugins`, or at a
DAG level through the
+``DAG.user_defined_macros`` argument.
.. _templates:variables:
@@ -32,55 +33,76 @@ Variables
The Airflow engine passes a few variables by default that are accessible
in all templates
+==========================================
====================================
+Variable Description
+==========================================
====================================
+``{{ data_interval_start }}`` Start of the data interval
(`pendulum.Pendulum`_ or ``None``).
+``{{ data_interval_end }}`` End of the data interval
(`pendulum.Pendulum`_ or ``None``).
+``{{ ds }}`` Start of the data interval as
``YYYY-MM-DD``.
+ Same as ``{{ data_interval_start |
ds }}``.
+``{{ ds_nodash }}`` Start of the data interval as
``YYYYMMDD``.
+ Same as ``{{ data_interval_start |
ds_nodash }}``.
+``{{ ts }}`` Same as ``{{ data_interval_start |
ts }}``.
+ Example:
``2018-01-01T00:00:00+00:00``.
+``{{ ts_nodash_with_tz }}`` Same as ``{{ data_interval_start |
ts_nodash_with_tz }}``.
+ Example: ``20180101T000000+0000``.
+``{{ ts_nodash }}`` Same as ``{{ data_interval_start |
ts_nodash }}``.
+ Example: ``20180101T000000``.
+``{{ prev_data_interval_start_success }}`` Start of the data interval from
prior successful DAG run
+ (`pendulum.Pendulum`_ or ``None``).
+``{{ prev_data_interval_end_success }}`` End of the data interval from
prior successful DAG run
+ (`pendulum.Pendulum`_ or ``None``).
+``{{ prev_start_date_success }}`` Start date from prior successful
dag run (if available)
+ (`pendulum.Pendulum`_ or ``None``).
+``{{ dag }}`` The DAG object.
+``{{ task }}`` The Task object.
+``{{ macros }}`` A reference to the macros package,
described below.
+``{{ task_instance }}`` The task_instance object.
+``{{ ti }}`` Same as ``{{ task_instance }}``.
+``{{ params }}`` A reference to the user-defined
params dictionary which can be
+ overridden by the dictionary
passed through ``trigger_dag -c`` if
+ you enabled
``dag_run_conf_overrides_params`` in ``airflow.cfg``.
+``{{ var.value.my_var }}`` Global defined variables
represented as a dictionary.
+``{{ var.json.my_var.path }}`` Global defined variables
represented as a dictionary.
+ With deserialized JSON object,
append the path to the key within
+ the JSON object.
+``{{ conn.my_conn_id }}`` Connection represented as a
dictionary.
+``{{ task_instance_key_str }}`` A unique, human-readable key to
the task instance formatted
+
``{dag_id}__{task_id}__{ds_nodash}``.
+``{{ conf }}`` The full configuration object
located at
+ ``airflow.configuration.conf``
which represents the content of
+ your ``airflow.cfg``.
+``{{ run_id }}`` The ``run_id`` of the current DAG
run.
+``{{ dag_run }}`` A reference to the DagRun object.
+``{{ test_mode }}`` Whether the task instance was
called using the CLI's test
+ subcommand.
+==========================================
====================================
+
+The following variables are deprecated. They are kept for backward
compatibility, but you should convert
+existing code to use other variables instead.
+
===================================== ====================================
-Variable Description
+Deprecated Variable Description
===================================== ====================================
-``{{ ds }}`` the execution date as ``YYYY-MM-DD``
-``{{ ds_nodash }}`` the execution date as ``YYYYMMDD``
-``{{ prev_ds }}`` the previous execution date as
``YYYY-MM-DD``
- if ``{{ ds }}`` is ``2018-01-08`` and
``schedule_interval`` is ``@weekly``,
- ``{{ prev_ds }}`` will be
``2018-01-01``
-``{{ prev_ds_nodash }}`` the previous execution date as
``YYYYMMDD`` if exists, else ``None``
-``{{ next_ds }}`` the next execution date as
``YYYY-MM-DD``
- if ``{{ ds }}`` is ``2018-01-01`` and
``schedule_interval`` is ``@weekly``,
- ``{{ next_ds }}`` will be
``2018-01-08``
+``{{ execution_date }}`` the execution date (logical date),
same as ``logical_date``
+``{{ next_execution_date }}`` the next execution date (if available)
(`pendulum.Pendulum`_)
+ if ``{{ execution_date }}`` is
``2018-01-01 00:00:00`` and
+ ``schedule_interval`` is ``@weekly``,
``{{ next_execution_date }}``
+ will be ``2018-01-08 00:00:00``
+``{{ next_ds }}`` the next execution date as
``YYYY-MM-DD`` if exists, else ``None``
``{{ next_ds_nodash }}`` the next execution date as
``YYYYMMDD`` if exists, else ``None``
+``{{ prev_execution_date }}`` the previous execution date (if
available) (`pendulum.Pendulum`_)
+ if ``{{ execution_date }}`` is
``2018-01-08 00:00:00`` and
+ ``schedule_interval`` is ``@weekly``,
``{{ prev_execution_date }}``
+ will be ``2018-01-01 00:00:00``
+``{{ prev_ds }}`` the previous execution date as
``YYYY-MM-DD`` if exists, else ``None``
+``{{ prev_ds_nodash }}`` the previous execution date as
``YYYYMMDD`` if exists, else ``None``
``{{ yesterday_ds }}`` the day before the execution date as
``YYYY-MM-DD``
``{{ yesterday_ds_nodash }}`` the day before the execution date as
``YYYYMMDD``
``{{ tomorrow_ds }}`` the day after the execution date as
``YYYY-MM-DD``
``{{ tomorrow_ds_nodash }}`` the day after the execution date as
``YYYYMMDD``
-``{{ ts }}`` same as
``execution_date.isoformat()``. Example: ``2018-01-01T00:00:00+00:00``
-``{{ ts_nodash }}`` same as ``ts`` without ``-``, ``:``
and TimeZone info. Example: ``20180101T000000``
-``{{ ts_nodash_with_tz }}`` same as ``ts`` without ``-`` and
``:``. Example: ``20180101T000000+0000``
-``{{ execution_date }}`` the execution_date (logical date)
(`pendulum.Pendulum`_)
-``{{ prev_execution_date }}`` the previous execution date (if
available) (`pendulum.Pendulum`_)
-``{{ prev_execution_date_success }}`` execution date from prior successful
dag run (if available) (`pendulum.Pendulum`_)
-``{{ prev_start_date_success }}`` start date from prior successful dag
run (if available) (`pendulum.Pendulum`_)
-``{{ next_execution_date }}`` the next execution date
(`pendulum.Pendulum`_)
-``{{ dag }}`` the DAG object
-``{{ task }}`` the Task object
-``{{ macros }}`` a reference to the macros package,
described below
-``{{ task_instance }}`` the task_instance object
-``{{ ti }}`` same as ``{{ task_instance }}``
-``{{ params }}`` a reference to the user-defined params
dictionary which can be overridden by
- the dictionary passed through
``trigger_dag -c`` if you enabled
- ``dag_run_conf_overrides_params`` in
``airflow.cfg``
-``{{ var.value.my_var }}`` global defined variables represented
as a dictionary
-``{{ var.json.my_var.path }}`` global defined variables represented
as a dictionary
- with deserialized JSON object, append
the path to the
- key within the JSON object
-``{{ conn.my_conn_id }}`` connection represented as a dictionary
-
-``{{ task_instance_key_str }}`` a unique, human-readable key to the
task instance
- formatted
``{dag_id}__{task_id}__{ds_nodash}``
-``{{ conf }}`` the full configuration object located
at
- ``airflow.configuration.conf`` which
- represents the content of your
- ``airflow.cfg``
-``{{ run_id }}`` the ``run_id`` of the current DAG run
-``{{ dag_run }}`` a reference to the DagRun object
-``{{ test_mode }}`` whether the task instance was called
using
- the CLI's test subcommand
+``{{ prev_execution_date_success }}`` execution date from prior successful
dag run
+
===================================== ====================================
Note that you can access the object's attributes and methods with simple
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 0aa13b2..03bd7d8 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -20,7 +20,7 @@ import pytest
from parameterized import parameterized
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.models import DagModel, DagRun
+from airflow.models import DAG, DagModel, DagRun
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
@@ -96,7 +96,8 @@ class TestDagRunEndpoint:
dag_instance = DagModel(dag_id=dag_id)
with create_session() as session:
session.add(dag_instance)
- session.commit()
+ dag = DAG(dag_id=dag_id, schedule_interval=None)
+ self.app.dag_bag.bag_dag(dag, root_dag=dag)
def _create_test_dag_run(self, state='running', extra_dag=False,
commit=True):
dag_runs = []
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 4a510f8..fd8404a 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -412,8 +412,34 @@ class TestCliDags(unittest.TestCase):
def test_trigger_dag(self):
dag_command.dag_trigger(
- self.parser.parse_args(['dags', 'trigger',
'example_bash_operator', '--conf', '{"foo": "bar"}'])
+ self.parser.parse_args(
+ [
+ 'dags',
+ 'trigger',
+ 'example_bash_operator',
+ '--run-id=test_trigger_dag',
+ '--exec-date=2021-06-04T09:00:00+08:00',
+ '--conf={"foo": "bar"}',
+ ],
+ ),
)
+ with create_session() as session:
+ dagrun = session.query(DagRun).filter(DagRun.run_id ==
"test_trigger_dag").one()
+
+ assert dagrun, "DagRun not created"
+ assert dagrun.run_type == DagRunType.MANUAL
+ assert dagrun.external_trigger
+ assert dagrun.conf == {"foo": "bar"}
+
+ # Coerced to UTC.
+ assert dagrun.execution_date.isoformat(timespec="seconds") ==
"2021-06-04T01:00:00+00:00"
+
+ # example_bash_operator runs every day at midnight, so the data
interval
+ # should be aligned to the previous day.
+ assert dagrun.data_interval_start.isoformat(timespec="seconds") ==
"2021-06-03T00:00:00+00:00"
+ assert dagrun.data_interval_end.isoformat(timespec="seconds") ==
"2021-06-04T00:00:00+00:00"
+
+ def test_trigger_dag_invalid_conf(self):
with pytest.raises(ValueError):
dag_command.dag_trigger(
self.parser.parse_args(
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index fe31bd2..d8a5687 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -269,11 +269,13 @@ class TestCore:
op.resolve_template_files()
def test_task_get_template(self):
- TI = TaskInstance
- ti = TI(task=self.runme_0, execution_date=DEFAULT_DATE)
+ ti = TaskInstance(task=self.runme_0, execution_date=DEFAULT_DATE)
ti.dag = self.dag_bash
self.dag_bash.create_dagrun(
- run_type=DagRunType.MANUAL, state=State.RUNNING,
execution_date=DEFAULT_DATE
+ run_type=DagRunType.MANUAL,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
)
ti.run(ignore_ti_state=True)
context = ti.get_template_context()
@@ -282,23 +284,35 @@ class TestCore:
assert context['ds'] == '2015-01-01'
assert context['ds_nodash'] == '20150101'
- # next_ds is 2015-01-02 as the dag interval is daily
+ # next_ds is 2015-01-02 as the dag schedule is daily.
assert context['next_ds'] == '2015-01-02'
assert context['next_ds_nodash'] == '20150102'
- # prev_ds is 2014-12-31 as the dag interval is daily
- assert context['prev_ds'] == '2014-12-31'
- assert context['prev_ds_nodash'] == '20141231'
-
assert context['ts'] == '2015-01-01T00:00:00+00:00'
assert context['ts_nodash'] == '20150101T000000'
assert context['ts_nodash_with_tz'] == '20150101T000000+0000'
- assert context['yesterday_ds'] == '2014-12-31'
- assert context['yesterday_ds_nodash'] == '20141231'
-
- assert context['tomorrow_ds'] == '2015-01-02'
- assert context['tomorrow_ds_nodash'] == '20150102'
+ assert context['data_interval_start'].isoformat() ==
'2015-01-01T00:00:00+00:00'
+ assert context['data_interval_end'].isoformat() ==
'2015-01-02T00:00:00+00:00'
+
+ # Test deprecated fields.
+ expected_deprecated_fields = [
+ ("prev_ds", "2014-12-31"),
+ ("prev_ds_nodash", "20141231"),
+ ("yesterday_ds", "2014-12-31"),
+ ("yesterday_ds_nodash", "20141231"),
+ ("tomorrow_ds", "2015-01-02"),
+ ("tomorrow_ds_nodash", "20150102"),
+ ]
+ for key, expected_value in expected_deprecated_fields:
+ message = (
+ f"Accessing {key!r} from the template is deprecated and "
+ f"will be removed in a future version."
+ )
+ with pytest.deprecated_call() as recorder:
+ value = str(context[key]) # Simulate template evaluation to
trigger warning.
+ assert value == expected_value
+ assert [str(m.message) for m in recorder] == [message]
def test_local_task_job(self):
TI = TaskInstance
@@ -412,13 +426,10 @@ class TestCore:
ti = TI(task=task, execution_date=execution_date)
context = ti.get_template_context()
- # next_ds/prev_ds should be the execution date for manually triggered
runs
+ # next_ds should be the execution date for manually triggered runs
assert context['next_ds'] == execution_ds
assert context['next_ds_nodash'] == execution_ds_nodash
- assert context['prev_ds'] == execution_ds
- assert context['prev_ds_nodash'] == execution_ds_nodash
-
def test_dag_params_and_task_params(self, dag_maker):
# This test case guards how params of DAG and Operator work together.
# - If any key exists in either DAG's or Operator's params,
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 25077fc..d28552a 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -268,6 +268,7 @@ class TestAirflowTaskDecorator(TestPythonBase):
self.dag.create_dagrun(
run_id=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
@@ -299,6 +300,7 @@ class TestAirflowTaskDecorator(TestPythonBase):
self.dag.create_dagrun(
run_id=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index d70606a..cf3b5c5 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -955,8 +955,8 @@ class TestBackfillJob:
# reached, so it is waiting
dag_run_created_cond.wait(timeout=1.5)
dagruns = DagRun.find(dag_id=dag_id)
- dr = dagruns[0]
assert 1 == len(dagruns)
+ dr = dagruns[0]
assert dr.run_id == run_id
# allow the backfill to execute
@@ -1330,30 +1330,34 @@ class TestBackfillJob:
session.close()
- def test_dag_get_run_dates(self, dag_maker):
+ def test_dag_dagrun_infos_between(self, dag_maker):
with dag_maker(
- dag_id='test_get_dates', start_date=DEFAULT_DATE,
schedule_interval="@hourly"
+ dag_id='dagrun_infos_between', start_date=DEFAULT_DATE,
schedule_interval="@hourly"
) as test_dag:
DummyOperator(
task_id='dummy',
owner='airflow',
)
- assert [DEFAULT_DATE] == test_dag.get_run_dates(
- start_date=DEFAULT_DATE,
- end_date=DEFAULT_DATE,
- align=True,
- )
+ assert [DEFAULT_DATE] == [
+ info.logical_date
+ for info in test_dag.iter_dagrun_infos_between(
+ earliest=DEFAULT_DATE,
+ latest=DEFAULT_DATE,
+ )
+ ]
assert [
DEFAULT_DATE - datetime.timedelta(hours=3),
DEFAULT_DATE - datetime.timedelta(hours=2),
DEFAULT_DATE - datetime.timedelta(hours=1),
DEFAULT_DATE,
- ] == test_dag.get_run_dates(
- start_date=DEFAULT_DATE - datetime.timedelta(hours=3),
- end_date=DEFAULT_DATE,
- align=True,
- )
+ ] == [
+ info.logical_date
+ for info in test_dag.iter_dagrun_infos_between(
+ earliest=DEFAULT_DATE - datetime.timedelta(hours=3),
+ latest=DEFAULT_DATE,
+ )
+ ]
def test_backfill_run_backwards(self):
dag = self.dagbag.get_dag("test_start_date_scheduling")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index c41aca8..9030005 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1250,7 +1250,10 @@ class TestSchedulerJob:
Test if a a dagrun would be scheduled if max_dag_runs has
been reached but dagrun_timeout is also reached
"""
- with
dag_maker(dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout') as
dag:
+ with dag_maker(
+ dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout',
+ start_date=DEFAULT_DATE,
+ ) as dag:
DummyOperator(task_id='dummy')
dag.max_active_runs = 1
dag.dagrun_timeout = datetime.timedelta(seconds=60)
@@ -1278,6 +1281,8 @@ class TestSchedulerJob:
assert orm_dag.next_dagrun_create_after
# But we should record the date of _what run_ it would be
assert isinstance(orm_dag.next_dagrun, datetime.datetime)
+ assert isinstance(orm_dag.next_dagrun_data_interval_start,
datetime.datetime)
+ assert isinstance(orm_dag.next_dagrun_data_interval_end,
datetime.datetime)
# Should be scheduled as dagrun_timeout has passed
dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
@@ -1294,6 +1299,8 @@ class TestSchedulerJob:
assert dr.state == State.FAILED
session.refresh(orm_dag)
assert isinstance(orm_dag.next_dagrun, datetime.datetime)
+ assert isinstance(orm_dag.next_dagrun_data_interval_start,
datetime.datetime)
+ assert isinstance(orm_dag.next_dagrun_data_interval_end,
datetime.datetime)
assert isinstance(orm_dag.next_dagrun_create_after, datetime.datetime)
expected_callback = DagCallbackRequest(
@@ -1572,9 +1579,12 @@ class TestSchedulerJob:
run_kwargs = {}
dag = self.dagbag.get_dag(dag_id)
+ dagrun_info = dag.next_dagrun_info(None)
+ assert dagrun_info is not None
+
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
- execution_date=dag.next_dagrun_info(None)[0],
+ execution_date=dagrun_info.logical_date,
state=State.RUNNING,
)
@@ -2801,21 +2811,28 @@ class TestSchedulerJob:
# Verify that dag_model.next_dagrun is equal to next execution_date
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE
+ assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE
+ assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE +
timedelta(minutes=1)
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor(do_update=False)
self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
- # Verify a DagRun is created with the correct execution_date
+ # Verify a DagRun is created with the correct dates
# when Scheduler._do_scheduling is run in the Scheduler Loop
self.scheduler_job._do_scheduling(session)
dr1 = dag.get_dagrun(DEFAULT_DATE, session=session)
assert dr1 is not None
assert dr1.state == State.RUNNING
+ assert dr1.execution_date == DEFAULT_DATE
+ assert dr1.data_interval_start == DEFAULT_DATE
+ assert dr1.data_interval_end == DEFAULT_DATE + timedelta(minutes=1)
- # Verify that dag_model.next_dagrun is set to next execution_date
+ # Verify that dag_model.next_dagrun is set to next interval
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+ assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE +
timedelta(minutes=1)
+ assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE +
timedelta(minutes=2)
# Trigger the Dag externally
dr = dag.create_dagrun(
@@ -2833,6 +2850,8 @@ class TestSchedulerJob:
# triggered DagRun.
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+ assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE +
timedelta(minutes=1)
+ assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE +
timedelta(minutes=2)
def test_scheduler_create_dag_runs_check_existing_run(self, dag_maker):
"""
@@ -2887,7 +2906,9 @@ class TestSchedulerJob:
# Test that this does not raise any error
self.scheduler_job._create_dag_runs([dag_model], session)
- # Assert dag_model.next_dagrun is set correctly to next execution date
+ # Assert the next dagrun fields are set correctly to next execution
date
+ assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE +
timedelta(days=1)
+ assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE +
timedelta(days=2)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
session.rollback()
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0b673b9..3807bbb 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -49,6 +49,7 @@ from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.security import permissions
+from airflow.timetables.base import DagRunInfo
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
@@ -1482,12 +1483,11 @@ class TestDag(unittest.TestCase):
'test_scheduler_dagrun_once', start_date=timezone.datetime(2015,
1, 1), schedule_interval="@once"
)
- next_date, _ = dag.next_dagrun_info(None)
+ next_info = dag.next_dagrun_info(None)
+ assert next_info and next_info.logical_date == timezone.datetime(2015,
1, 1)
- assert next_date == timezone.datetime(2015, 1, 1)
-
- next_date, _ = dag.next_dagrun_info(next_date)
- assert next_date is None
+ next_info = dag.next_dagrun_info(next_info.logical_date)
+ assert next_info is None
def test_next_dagrun_info_start_end_dates(self):
"""
@@ -1506,15 +1506,16 @@ class TestDag(unittest.TestCase):
dates = []
date = None
for _ in range(runs):
- date, _ = dag.next_dagrun_info(date)
- dates.append(date)
-
- for date in dates:
- assert date is not None
-
+ next_info = dag.next_dagrun_info(date)
+ if next_info is None:
+ dates.append(None)
+ else:
+ date = next_info.logical_date
+ dates.append(date)
+
+ assert all(date is not None for date in dates)
assert dates[-1] == end_date
-
- assert dag.next_dagrun_info(date)[0] is None
+ assert dag.next_dagrun_info(date) is None
def test_next_dagrun_info_catchup(self):
"""
@@ -1596,12 +1597,12 @@ class TestDag(unittest.TestCase):
catchup=False,
)
- next_date, _ = dag.next_dagrun_info(None)
- assert next_date == timezone.datetime(2020, 1, 4)
+ next_info = dag.next_dagrun_info(None)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
1, 4)
# The date to create is in the future, this is handled by
"DagModel.dags_needing_dagruns"
- next_date, _ = dag.next_dagrun_info(next_date)
- assert next_date == timezone.datetime(2020, 1, 5)
+ next_info = dag.next_dagrun_info(next_info.logical_date)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
1, 5)
@freeze_time(timezone.datetime(2020, 5, 4))
def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self):
@@ -1616,18 +1617,18 @@ class TestDag(unittest.TestCase):
catchup=True,
)
- next_date, _ = dag.next_dagrun_info(None)
- assert next_date == timezone.datetime(2020, 5, 1)
+ next_info = dag.next_dagrun_info(None)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
5, 1)
- next_date, _ = dag.next_dagrun_info(next_date)
- assert next_date == timezone.datetime(2020, 5, 2)
+ next_info = dag.next_dagrun_info(next_info.logical_date)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
5, 2)
- next_date, _ = dag.next_dagrun_info(next_date)
- assert next_date == timezone.datetime(2020, 5, 3)
+ next_info = dag.next_dagrun_info(next_info.logical_date)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
5, 3)
# The date to create is in the future, this is handled by
"DagModel.dags_needing_dagruns"
- next_date, _ = dag.next_dagrun_info(next_date)
- assert next_date == timezone.datetime(2020, 5, 4)
+ next_info = dag.next_dagrun_info(next_info.logical_date)
+ assert next_info and next_info.logical_date == timezone.datetime(2020,
5, 4)
def test_next_dagrun_after_auto_align(self):
"""
@@ -1643,8 +1644,8 @@ class TestDag(unittest.TestCase):
)
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
- next_date, _ = dag.next_dagrun_info(None)
- assert next_date == timezone.datetime(2016, 1, 2, 5, 4)
+ next_info = dag.next_dagrun_info(None)
+ assert next_info and next_info.logical_date == timezone.datetime(2016,
1, 2, 5, 4)
dag = DAG(
dag_id='test_scheduler_auto_align_2',
@@ -1653,8 +1654,8 @@ class TestDag(unittest.TestCase):
)
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
- next_date, _ = dag.next_dagrun_info(None)
- assert next_date == timezone.datetime(2016, 1, 1, 10, 10)
+ next_info = dag.next_dagrun_info(None)
+ assert next_info and next_info.logical_date == timezone.datetime(2016,
1, 1, 10, 10)
def test_next_dagrun_after_not_for_subdags(self):
"""
@@ -1693,11 +1694,11 @@ class TestDag(unittest.TestCase):
subdag.parent_dag = dag
subdag.is_subdag = True
- next_date, _ = dag.next_dagrun_info(None)
- assert next_date == timezone.datetime(2019, 1, 1, 0, 0)
+ next_parent_info = dag.next_dagrun_info(None)
+ assert next_parent_info.logical_date == timezone.datetime(2019, 1, 1,
0, 0)
- next_subdag_date, _ = subdag.next_dagrun_info(None)
- assert next_subdag_date is None, "SubDags should never have DagRuns
created by the scheduler"
+ next_subdag_info = subdag.next_dagrun_info(None)
+ assert next_subdag_info is None, "SubDags should never have DagRuns
created by the scheduler"
def test_replace_outdated_access_control_actions(self):
outdated_permissions = {
@@ -2029,3 +2030,46 @@ def test_set_task_instance_state():
assert dagrun.get_state() == State.QUEUED
assert {t.key for t in altered} == {('test_set_task_instance_state',
'task_1', start_date, 1)}
+
+
[email protected](
+ "start_date, expected_infos",
+ [
+ (
+ DEFAULT_DATE,
+ [DagRunInfo.interval(DEFAULT_DATE, DEFAULT_DATE +
datetime.timedelta(hours=1))],
+ ),
+ (
+ DEFAULT_DATE - datetime.timedelta(hours=3),
+ [
+ DagRunInfo.interval(
+ DEFAULT_DATE - datetime.timedelta(hours=3),
+ DEFAULT_DATE - datetime.timedelta(hours=2),
+ ),
+ DagRunInfo.interval(
+ DEFAULT_DATE - datetime.timedelta(hours=2),
+ DEFAULT_DATE - datetime.timedelta(hours=1),
+ ),
+ DagRunInfo.interval(
+ DEFAULT_DATE - datetime.timedelta(hours=1),
+ DEFAULT_DATE,
+ ),
+ DagRunInfo.interval(
+ DEFAULT_DATE,
+ DEFAULT_DATE + datetime.timedelta(hours=1),
+ ),
+ ],
+ ),
+ ],
+ ids=["in-dag-restriction", "out-of-dag-restriction"],
+)
+def test_iter_dagrun_infos_between(start_date, expected_infos):
+ dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE,
schedule_interval="@hourly")
+ DummyOperator(task_id='dummy', dag=dag)
+
+ iterator = dag.iter_dagrun_infos_between(
+ earliest=pendulum.instance(start_date),
+ latest=pendulum.instance(DEFAULT_DATE),
+ align=True,
+ )
+ assert expected_infos == list(iterator)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 93f6138..cf3952c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1446,13 +1446,20 @@ class TestTaskInstance:
schedule_interval='0 12 * * *',
)
- ti = TI(task=task, execution_date=timezone.utcnow())
+ execution_date = timezone.utcnow()
+
+ dag.create_dagrun(
+ execution_date=execution_date,
+ state=State.RUNNING,
+ run_type=DagRunType.MANUAL,
+ )
+
+ ti = TI(task=task, execution_date=execution_date)
template_context = ti.get_template_context()
- assert isinstance(template_context["execution_date"],
pendulum.DateTime)
- assert isinstance(template_context["next_execution_date"],
pendulum.DateTime)
- assert isinstance(template_context["prev_execution_date"],
pendulum.DateTime)
+ assert isinstance(template_context["data_interval_start"],
pendulum.DateTime)
+ assert isinstance(template_context["data_interval_end"],
pendulum.DateTime)
@pytest.mark.parametrize(
"content, expected_output",
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 5828f1f..82fbd22 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -162,6 +162,7 @@ class TestPythonOperator(TestPythonBase):
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
@@ -199,6 +200,7 @@ class TestPythonOperator(TestPythonBase):
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
diff --git a/tests/operators/test_subdag_operator.py
b/tests/operators/test_subdag_operator.py
index 9ebdc55..dcdb508 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -148,13 +148,20 @@ class TestSubDagOperator(unittest.TestCase):
subdag_task._get_dagrun = Mock()
subdag_task._get_dagrun.side_effect = [None, self.dag_run_success,
self.dag_run_success]
- subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+ context = {
+ 'data_interval_start': None,
+ 'data_interval_end': None,
+ 'execution_date': DEFAULT_DATE,
+ }
+
+ subdag_task.pre_execute(context=context)
+ subdag_task.execute(context=context)
+ subdag_task.post_execute(context=context)
subdag.create_dagrun.assert_called_once_with(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
+ data_interval=None,
conf=None,
state=State.RUNNING,
external_trigger=True,
@@ -178,13 +185,20 @@ class TestSubDagOperator(unittest.TestCase):
subdag_task._get_dagrun = Mock()
subdag_task._get_dagrun.side_effect = [None, self.dag_run_success,
self.dag_run_success]
- subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+ context = {
+ 'data_interval_start': None,
+ 'data_interval_end': None,
+ 'execution_date': DEFAULT_DATE,
+ }
+
+ subdag_task.pre_execute(context=context)
+ subdag_task.execute(context=context)
+ subdag_task.post_execute(context=context)
subdag.create_dagrun.assert_called_once_with(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
+ data_interval=None,
conf=conf,
state=State.RUNNING,
external_trigger=True,
@@ -206,10 +220,16 @@ class TestSubDagOperator(unittest.TestCase):
subdag_task._get_dagrun = Mock()
subdag_task._get_dagrun.side_effect = [None, self.dag_run_failed,
self.dag_run_failed]
+ context = {
+ 'data_interval_start': None,
+ 'data_interval_end': None,
+ 'execution_date': DEFAULT_DATE,
+ }
+
with pytest.raises(AirflowException):
- subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+ subdag_task.pre_execute(context=context)
+ subdag_task.execute(context=context)
+ subdag_task.post_execute(context=context)
def test_execute_skip_if_dagrun_success(self):
"""
@@ -223,9 +243,15 @@ class TestSubDagOperator(unittest.TestCase):
subdag_task._get_dagrun = Mock()
subdag_task._get_dagrun.return_value = self.dag_run_success
- subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.execute(context={'execution_date': DEFAULT_DATE})
- subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+ context = {
+ 'data_interval_start': None,
+ 'data_interval_end': None,
+ 'execution_date': DEFAULT_DATE,
+ }
+
+ subdag_task.pre_execute(context=context)
+ subdag_task.execute(context=context)
+ subdag_task.post_execute(context=context)
subdag.create_dagrun.assert_not_called()
assert 3 == len(subdag_task._get_dagrun.mock_calls)
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index f9e0ebf..dd3bf29 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -421,9 +421,8 @@ class TestBaseSensor(unittest.TestCase):
# poke returns False and AirflowRescheduleException is raised
date1 = timezone.utcnow()
with freeze_time(date1):
- dates = self.dag.get_run_dates(DEFAULT_DATE,
end_date=DEFAULT_DATE, align=True)
- for date in dates:
- TaskInstance(sensor, date).run(ignore_ti_state=True,
test_mode=True)
+ for info in self.dag.iter_dagrun_infos_between(DEFAULT_DATE,
DEFAULT_DATE):
+ TaskInstance(sensor,
info.logical_date).run(ignore_ti_state=True, test_mode=True)
tis = dr.get_task_instances()
assert len(tis) == 2
for ti in tis:
diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py
index 37f0114..c4aecd4 100644
--- a/tests/sensors/test_python.py
+++ b/tests/sensors/test_python.py
@@ -76,6 +76,7 @@ class TestPythonSensor(TestPythonBase):
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
@@ -117,6 +118,7 @@ class TestPythonSensor(TestPythonBase):
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=DEFAULT_DATE,
state=State.RUNNING,
)
diff --git a/tests/test_utils/perf/scheduler_dag_execution_timing.py
b/tests/test_utils/perf/scheduler_dag_execution_timing.py
index 0998a97..2adbd5f 100755
--- a/tests/test_utils/perf/scheduler_dag_execution_timing.py
+++ b/tests/test_utils/perf/scheduler_dag_execution_timing.py
@@ -166,7 +166,7 @@ def create_dag_runs(dag, num_runs, session):
last_dagrun_at = None
for _ in range(num_runs):
next_info = dag.next_dagrun_info(last_dagrun_at)
- last_dagrun_at = next_info.data_interval.start
+ last_dagrun_at = next_info.logical_date
dag.create_dagrun(
run_id=f"{id_prefix}{last_dagrun_at.isoformat()}",
execution_date=last_dagrun_at,
diff --git a/tests/timetables/test_time_table_iter_ranges.py
b/tests/timetables/test_time_table_iter_ranges.py
deleted file mode 100644
index c9ee747..0000000
--- a/tests/timetables/test_time_table_iter_ranges.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Tests for Timetable.iter_between()."""
-
-from datetime import datetime, timedelta
-
-import pytest
-
-from airflow.settings import TIMEZONE
-from airflow.timetables.interval import DeltaDataIntervalTimetable
-
-
[email protected]()
-def timetable_1s():
- return DeltaDataIntervalTimetable(timedelta(seconds=1))
-
-
-def test_end_date_before_start_date(timetable_1s):
- start = datetime(2016, 2, 1, tzinfo=TIMEZONE)
- end = datetime(2016, 1, 1, tzinfo=TIMEZONE)
- message = r"start \([- :+\d]{25}\) > end \([- :+\d]{25}\)"
- with pytest.raises(ValueError, match=message):
- list(timetable_1s.iter_between(start, end, align=True))
diff --git a/tests/www/views/test_views_trigger_dag.py
b/tests/www/views/test_views_trigger_dag.py
index 5540328..2ef3fd5 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import datetime
import json
import pytest
@@ -100,7 +101,7 @@ def test_trigger_dag_wrong_execution_date(admin_client):
assert run is None
-def test_trigger_dag_execution_date(admin_client):
+def test_trigger_dag_execution_date_data_interval(admin_client):
test_dag_id = "example_bash_operator"
exec_date = timezone.utcnow()
@@ -113,6 +114,12 @@ def test_trigger_dag_execution_date(admin_client):
assert run.run_type == DagRunType.MANUAL
assert run.execution_date == exec_date
+ # Since example_bash_operator runs once per day, the data interval should
be
+ # between midnight yesterday and midnight today.
+ today_midnight = exec_date.replace(hour=0, minute=0, second=0,
microsecond=0)
+ assert run.data_interval_start == (today_midnight -
datetime.timedelta(days=1))
+ assert run.data_interval_end == today_midnight
+
def test_trigger_dag_form(admin_client):
test_dag_id = "example_bash_operator"