This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f352ee63a5 Replaced all days_ago functions with datetime functions
(#23237)
f352ee63a5 is described below
commit f352ee63a5d09546a7997ba8f2f8702a1ddb4af7
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon May 23 11:32:52 2022 -0400
Replaced all days_ago functions with datetime functions (#23237)
Co-authored-by: Dev232001 <[email protected]>
---
airflow/example_dags/example_subdag_operator.py | 5 ++--
tests/api/common/test_delete_dag.py | 7 ++---
tests/api/common/test_mark_tasks.py | 21 ++++++++------
.../endpoints/test_extra_link_endpoint.py | 33 +++++++++-------------
tests/cli/commands/test_task_command.py | 12 ++++----
tests/dag_processing/test_processor.py | 18 ++++++------
tests/dags/test_default_views.py | 6 ++--
tests/dags/test_example_bash_operator.py | 9 ++----
tests/dags/test_mapped_classic.py | 5 ++--
tests/dags/test_mapped_taskflow.py | 5 ++--
tests/dags/test_miscellaneous.py | 7 ++---
tests/dags/test_missing_owner.py | 7 ++---
tests/dags/test_multiple_dags.py | 7 ++---
tests/dags/test_with_non_default_owner.py | 7 ++---
tests/jobs/test_backfill_job.py | 3 +-
tests/jobs/test_scheduler_job.py | 3 +-
tests/models/test_dagrun.py | 7 ++---
tests/operators/test_python.py | 3 +-
.../google/cloud/operators/test_mlengine.py | 3 +-
tests/test_utils/perf/dags/perf_dag_1.py | 14 +++++----
tests/test_utils/perf/dags/perf_dag_2.py | 14 +++++----
tests/utils/test_task_group.py | 3 +-
tests/www/views/test_views_decorators.py | 4 +--
tests/www/views/test_views_extra_links.py | 7 ++---
tests/www/views/test_views_task_norun.py | 4 +--
tests/www/views/test_views_tasks.py | 4 +--
26 files changed, 105 insertions(+), 113 deletions(-)
diff --git a/airflow/example_dags/example_subdag_operator.py
b/airflow/example_dags/example_subdag_operator.py
index 84a303815a..79d369d638 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -19,18 +19,19 @@
"""Example DAG demonstrating the usage of the SubDagOperator."""
# [START example_subdag_operator]
+import datetime
+
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
-from airflow.utils.dates import days_ago
DAG_NAME = 'example_subdag_operator'
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
- start_date=days_ago(2),
+ start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@once",
tags=['example'],
) as dag:
diff --git a/tests/api/common/test_delete_dag.py
b/tests/api/common/test_delete_dag.py
index 2830020d29..cf51856409 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -16,14 +16,13 @@
# specific language governing permissions and limitations
# under the License.
-
import pytest
from airflow import models
from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
+from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
@@ -73,11 +72,11 @@ class TestDeleteDAGSuccessfulDelete:
task = EmptyOperator(
task_id='dummy',
- dag=models.DAG(dag_id=self.key, default_args={'start_date':
days_ago(2)}),
+ dag=models.DAG(dag_id=self.key, default_args={'start_date':
timezone.datetime(2022, 1, 1)}),
owner='airflow',
)
- test_date = days_ago(1)
+ test_date = timezone.datetime(2022, 1, 1)
with create_session() as session:
session.add(DM(dag_id=self.key, fileloc=self.dag_file_path,
is_subdag=for_sub_dag))
dr = DR(dag_id=self.key, run_type=DagRunType.MANUAL,
run_id="test", execution_date=test_date)
diff --git a/tests/api/common/test_mark_tasks.py
b/tests/api/common/test_mark_tasks.py
index 77064560f2..fee33a50e8 100644
--- a/tests/api/common/test_mark_tasks.py
+++ b/tests/api/common/test_mark_tasks.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import timedelta
+import datetime
from typing import Callable
import pytest
@@ -34,7 +34,6 @@ from airflow.api.common.mark_tasks import (
)
from airflow.models import DagRun
from airflow.utils import timezone
-from airflow.utils.dates import days_ago
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
@@ -62,12 +61,12 @@ class TestMarkTasks:
cls.dag2 = dagbag.get_dag('example_subdag_operator')
cls.dag3 = dagbag.get_dag('example_trigger_target_dag')
cls.dag4 = dagbag.get_dag('test_mapped_classic')
- cls.execution_dates = [days_ago(2), days_ago(1)]
+ cls.execution_dates = [timezone.datetime(2022, 1, 1),
timezone.datetime(2022, 1, 2)]
start_date3 = cls.dag3.start_date
cls.dag3_execution_dates = [
start_date3,
- start_date3 + timedelta(days=1),
- start_date3 + timedelta(days=2),
+ start_date3 + datetime.timedelta(days=1),
+ start_date3 + datetime.timedelta(days=2),
]
@pytest.fixture(autouse=True)
@@ -76,7 +75,7 @@ class TestMarkTasks:
clear_db_runs()
drs = _create_dagruns(
self.dag1,
- [_DagRunInfo(d, (d, d + timedelta(days=1))) for d in
self.execution_dates],
+ [_DagRunInfo(d, (d, d + datetime.timedelta(days=1))) for d in
self.execution_dates],
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
@@ -88,7 +87,7 @@ class TestMarkTasks:
[
_DagRunInfo(
self.dag2.start_date,
- (self.dag2.start_date, self.dag2.start_date +
timedelta(days=1)),
+ (self.dag2.start_date, self.dag2.start_date +
datetime.timedelta(days=1)),
),
],
state=State.RUNNING,
@@ -112,7 +111,7 @@ class TestMarkTasks:
[
_DagRunInfo(
self.dag4.start_date,
- (self.dag4.start_date, self.dag4.start_date +
timedelta(days=1)),
+ (self.dag4.start_date, self.dag4.start_date +
datetime.timedelta(days=1)),
)
],
state=State.SUCCESS,
@@ -482,7 +481,11 @@ class TestMarkDAGRun:
cls.dag1.sync_to_db()
cls.dag2 = dagbag.dags['example_subdag_operator']
cls.dag2.sync_to_db()
- cls.execution_dates = [days_ago(2), days_ago(1), days_ago(0)]
+ cls.execution_dates = [
+ timezone.datetime(2022, 1, 1),
+ timezone.datetime(2022, 1, 2),
+ timezone.datetime(2022, 1, 3),
+ ]
def setup_method(self):
clear_db_runs()
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index a1eb3b8a54..c209f661a8 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -14,11 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
import os
from urllib.parse import quote_plus
import pytest
-from parameterized import parameterized
from airflow import DAG
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
@@ -28,8 +28,8 @@ from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import
BigQueryExecuteQueryOperator
from airflow.security import permissions
+from airflow.utils import timezone
from airflow.utils.state import DagRunState
-from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import create_user, delete_user
from tests.test_utils.db import clear_db_runs, clear_db_xcom
@@ -61,7 +61,7 @@ def configured_app(minimal_app_for_api):
class TestGetExtraLinks:
@pytest.fixture(autouse=True)
def setup_attrs(self, configured_app, session) -> None:
- self.default_time = datetime(2020, 1, 1)
+ self.default_time = timezone.datetime(2020, 1, 1)
clear_db_runs()
clear_db_xcom()
@@ -90,40 +90,35 @@ class TestGetExtraLinks:
clear_db_xcom()
def _create_dag(self):
- with DAG(
- dag_id="TEST_DAG_ID",
- default_args=dict(
- start_date=self.default_time,
- ),
- ) as dag:
+ with DAG(dag_id="TEST_DAG_ID", default_args={"start_date":
self.default_time}) as dag:
BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY",
sql="SELECT 1")
BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY",
sql=["SELECT 1", "SELECT 2"])
return dag
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "url, expected_title, expected_detail",
[
- (
- "missing_dag",
+ pytest.param(
"/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG not found",
'DAG with ID = "INVALID" not found',
+ id="missing_dag",
),
- (
- "missing_dag_run",
+ pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links",
"DAG Run not found",
'DAG Run with ID = "INVALID" not found',
+ id="missing_dag_run",
),
- (
- "missing_task",
+ pytest.param(
"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links",
"Task not found",
'Task with ID = "INVALID" not found',
+ id="missing_task",
),
- ]
+ ],
)
- def test_should_respond_404(self, name, url, expected_title,
expected_detail):
- del name
+ def test_should_respond_404(self, url, expected_title, expected_detail):
response = self.client.get(url, environ_overrides={'REMOTE_USER':
"test"})
assert 404 == response.status_code
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index ffcc860388..1f4662b753 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
+
import io
import json
import logging
@@ -24,7 +24,6 @@ import re
import unittest
from argparse import ArgumentParser
from contextlib import redirect_stdout
-from datetime import datetime
from unittest import mock
import pytest
@@ -38,14 +37,13 @@ from airflow.exceptions import AirflowException,
DagRunNotFound
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
-from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools, clear_db_runs
-DEFAULT_DATE = days_ago(1)
+DEFAULT_DATE = timezone.datetime(2022, 1, 1)
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir,
os.pardir)
)
@@ -374,7 +372,7 @@ class TestCliTasks(unittest.TestCase):
dag2 = DagBag().dags['example_python_operator']
task2 = dag2.get_task(task_id='print_the_context')
- default_date2 = timezone.make_aware(datetime(2016, 1, 9))
+ default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
dagrun = dag2.create_dagrun(
state=State.RUNNING,
@@ -417,7 +415,7 @@ class TestCliTasks(unittest.TestCase):
task_states_for_dag_run should return an AirflowException when invalid
dag id is passed
"""
with pytest.raises(DagRunNotFound):
- default_date2 = timezone.make_aware(datetime(2016, 1, 9))
+ default_date2 = timezone.datetime(2016, 1, 9)
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
@@ -455,7 +453,7 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
self.run_id = "test_run"
self.dag_path = os.path.join(ROOT_FOLDER, "dags",
"test_logging_in_dag.py")
reset(self.dag_id)
- self.execution_date = timezone.make_aware(datetime(2017, 1, 1))
+ self.execution_date = timezone.datetime(2017, 1, 1)
self.execution_date_str = self.execution_date.isoformat()
self.task_args = ['tasks', 'run', self.dag_id, self.task_id,
'--local', self.execution_date_str]
self.log_dir = conf.get_mandatory_value('logging', 'base_log_folder')
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 9b8716bded..e2f9165131 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-
import datetime
import os
from unittest import mock
@@ -34,7 +33,6 @@ from airflow.models import DagBag, DagModel, SlaMiss,
TaskInstance, errors
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
-from airflow.utils.dates import days_ago
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
@@ -113,7 +111,7 @@ class TestDagFileProcessor:
# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
- test_start_date = days_ago(1)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
@@ -142,7 +140,7 @@ class TestDagFileProcessor:
# Create dag with a start of 1 day ago, but an sla of 0
# so we'll already have an sla_miss on the books.
# Pass anything besides a timedelta object to the sla argument.
- test_start_date = days_ago(1)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
@@ -170,7 +168,7 @@ class TestDagFileProcessor:
# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
@@ -206,7 +204,7 @@ class TestDagFileProcessor:
# Create dag with a start of 2 days ago, but an sla of 1 day
# ago so we'll already have an sla_miss on the books
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
with dag_maker(
dag_id='test_sla_miss',
default_args={'start_date': test_start_date, 'sla':
datetime.timedelta(days=1)},
@@ -247,7 +245,7 @@ class TestDagFileProcessor:
sla_callback = MagicMock(side_effect=RuntimeError('Could not call
function'))
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
@@ -277,7 +275,7 @@ class TestDagFileProcessor:
):
session = settings.Session()
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
email1 = '[email protected]'
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
@@ -317,7 +315,7 @@ class TestDagFileProcessor:
# Mock the callback function so we can verify that it was not called
mock_send_email.side_effect = RuntimeError('Could not send an email')
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
@@ -347,7 +345,7 @@ class TestDagFileProcessor:
"""
session = settings.Session()
- test_start_date = days_ago(2)
+ test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
dag_id='test_sla_miss',
task_id='dummy',
diff --git a/tests/dags/test_default_views.py b/tests/dags/test_default_views.py
index ca51c10e62..6a1aefb4ee 100644
--- a/tests/dags/test_default_views.py
+++ b/tests/dags/test_default_views.py
@@ -15,10 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+import pendulum
+
from airflow.models import DAG
-from airflow.utils.dates import days_ago
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
+args = {'owner': 'airflow', 'retries': 3, 'start_date':
pendulum.datetime(2022, 1, 1)}
tree_dag = DAG(
dag_id='test_tree_view',
diff --git a/tests/dags/test_example_bash_operator.py
b/tests/dags/test_example_bash_operator.py
index 4e44f55ff6..288667ad06 100644
--- a/tests/dags/test_example_bash_operator.py
+++ b/tests/dags/test_example_bash_operator.py
@@ -15,20 +15,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from datetime import timedelta
+import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
-
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
dag = DAG(
dag_id='test_example_bash_operator',
- default_args=args,
+ default_args={'owner': 'airflow', 'retries': 3, 'start_date':
datetime.datetime(2022, 1, 1)},
schedule_interval='0 0 * * *',
- dagrun_timeout=timedelta(minutes=60),
+ dagrun_timeout=datetime.timedelta(minutes=60),
)
cmd = 'ls -l'
diff --git a/tests/dags/test_mapped_classic.py
b/tests/dags/test_mapped_classic.py
index cbf3a8a5b8..c3e0d413be 100644
--- a/tests/dags/test_mapped_classic.py
+++ b/tests/dags/test_mapped_classic.py
@@ -15,10 +15,11 @@
# specific language governing permissions and limitations
# under the License.
+import datetime
+
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
-from airflow.utils.dates import days_ago
@task
@@ -30,7 +31,7 @@ def consumer(value):
print(repr(value))
-with DAG(dag_id='test_mapped_classic', start_date=days_ago(2)) as dag:
+with DAG(dag_id='test_mapped_classic', start_date=datetime.datetime(2022, 1,
1)) as dag:
PythonOperator.partial(task_id='consumer',
python_callable=consumer).expand(op_args=make_arg_lists())
PythonOperator.partial(task_id='consumer_literal',
python_callable=consumer).expand(
op_args=[[1], [2], [3]],
diff --git a/tests/dags/test_mapped_taskflow.py
b/tests/dags/test_mapped_taskflow.py
index e4e796c3e4..a803d9afbb 100644
--- a/tests/dags/test_mapped_taskflow.py
+++ b/tests/dags/test_mapped_taskflow.py
@@ -15,10 +15,11 @@
# specific language governing permissions and limitations
# under the License.
+import datetime
+
from airflow import DAG
-from airflow.utils.dates import days_ago
-with DAG(dag_id='test_mapped_taskflow', start_date=days_ago(2)) as dag:
+with DAG(dag_id='test_mapped_taskflow', start_date=datetime.datetime(2022, 1,
1)) as dag:
@dag.task
def make_list():
diff --git a/tests/dags/test_miscellaneous.py b/tests/dags/test_miscellaneous.py
index 7f77340e62..2174a61478 100644
--- a/tests/dags/test_miscellaneous.py
+++ b/tests/dags/test_miscellaneous.py
@@ -18,12 +18,11 @@
"""Example DAG demonstrating the usage of the BashOperator."""
-from datetime import timedelta
+import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
@@ -33,8 +32,8 @@ dag = DAG(
dag_id='miscellaneous_test_dag',
default_args=args,
schedule_interval='0 0 * * *',
- start_date=days_ago(2),
- dagrun_timeout=timedelta(minutes=60),
+ start_date=datetime.datetime(2022, 1, 1),
+ dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
)
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
index dc70a5b757..cc20db08c6 100644
--- a/tests/dags/test_missing_owner.py
+++ b/tests/dags/test_missing_owner.py
@@ -16,17 +16,16 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import timedelta
+import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
with DAG(
dag_id="test_missing_owner",
schedule_interval="0 0 * * *",
- start_date=days_ago(2),
- dagrun_timeout=timedelta(minutes=60),
+ start_date=datetime.datetime(2022, 1, 1),
+ dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example"],
) as dag:
run_this_last = EmptyOperator(
diff --git a/tests/dags/test_multiple_dags.py b/tests/dags/test_multiple_dags.py
index 44aa5cdfc2..67ae544123 100644
--- a/tests/dags/test_multiple_dags.py
+++ b/tests/dags/test_multiple_dags.py
@@ -15,13 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from datetime import timedelta
+import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
-from airflow.utils.dates import days_ago
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
+args = {'owner': 'airflow', 'retries': 3, 'start_date':
datetime.datetime(2022, 1, 1)}
def create_dag(suffix):
@@ -29,7 +28,7 @@ def create_dag(suffix):
dag_id=f'test_multiple_dags__{suffix}',
default_args=args,
schedule_interval='0 0 * * *',
- dagrun_timeout=timedelta(minutes=60),
+ dagrun_timeout=datetime.timedelta(minutes=60),
)
with dag:
diff --git a/tests/dags/test_with_non_default_owner.py
b/tests/dags/test_with_non_default_owner.py
index eb7d1f3d99..43a2953f9c 100644
--- a/tests/dags/test_with_non_default_owner.py
+++ b/tests/dags/test_with_non_default_owner.py
@@ -16,17 +16,16 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import timedelta
+import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
with DAG(
dag_id="test_with_non_default_owner",
schedule_interval="0 0 * * *",
- start_date=days_ago(2),
- dagrun_timeout=timedelta(minutes=60),
+ start_date=datetime.datetime(2022, 1, 1),
+ dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example"],
) as dag:
run_this_last = EmptyOperator(
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index c358ada979..d1d461a4e0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -23,7 +23,6 @@ import logging
import threading
from unittest.mock import patch
-import pendulum
import pytest
from airflow import settings
@@ -1600,7 +1599,7 @@ class TestBackfillJob:
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
dag = self.dagbag.get_dag(dag_id)
- when = pendulum.today('UTC')
+ when = datetime.datetime(2022, 1, 1)
job = BackfillJob(
dag=dag,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b50c8bfd7f..fd32e6dd7d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3955,7 +3955,6 @@ class TestSchedulerJob:
"""End-to-end test of a simple mapped dag"""
# Use SequentialExecutor for more predictable test behaviour
from airflow.executors.sequential_executor import SequentialExecutor
- from airflow.utils.dates import days_ago
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
dag = self.dagbag.get_dag(dag_id)
@@ -3964,7 +3963,7 @@ class TestSchedulerJob:
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
state=State.RUNNING,
- execution_date=days_ago(2),
+ execution_date=timezone.utcnow() - datetime.timedelta(days=2),
session=session,
)
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index b53df7bf9b..f73f5d1c45 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -37,7 +37,6 @@ from airflow.operators.python import ShortCircuitOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.stats import Stats
from airflow.utils import timezone
-from airflow.utils.dates import days_ago
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
@@ -785,7 +784,7 @@ class TestDagRun:
Tests that dag scheduling delay stat is not called if the dagrun is
not a scheduled run.
This case is manual run. Simple test for coherence check.
"""
- dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1))
+ dag = DAG(dag_id='test_dagrun_stats', start_date=DEFAULT_DATE)
dag_task = EmptyOperator(task_id='dummy', dag=dag)
initial_task_states = {
@@ -809,7 +808,7 @@ class TestDagRun:
Tests that dag scheduling delay stat is set properly once running
scheduled dag.
dag_run.update_state() invokes the
_emit_true_scheduling_delay_stats_for_finished_state method.
"""
- dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1),
schedule_interval=schedule_interval)
+ dag = DAG(dag_id='test_emit_dag_stats', start_date=DEFAULT_DATE,
schedule_interval=schedule_interval)
dag_task = EmptyOperator(task_id='dummy', dag=dag, owner='airflow')
try:
@@ -860,7 +859,7 @@ class TestDagRun:
"""
Tests that adding State.failed_states and State.success_states work as
expected.
"""
- dag = DAG(dag_id='test_dagrun_states', start_date=days_ago(1))
+ dag = DAG(dag_id='test_dagrun_states', start_date=DEFAULT_DATE)
dag_task_success = EmptyOperator(task_id='dummy', dag=dag)
dag_task_failed = EmptyOperator(task_id='dummy2', dag=dag)
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index e58b424de6..a925e52fa9 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -42,7 +42,6 @@ from airflow.operators.python import (
)
from airflow.utils import timezone
from airflow.utils.context import AirflowContextDeprecationWarning, Context
-from airflow.utils.dates import days_ago
from airflow.utils.python_virtualenv import prepare_virtualenv
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -1161,7 +1160,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
DEFAULT_ARGS = {
"owner": "test",
"depends_on_past": True,
- "start_date": days_ago(1),
+ "start_date": timezone.datetime(2022, 1, 1),
"end_date": datetime.today(),
"schedule_interval": "@once",
"retries": 1,
diff --git a/tests/providers/google/cloud/operators/test_mlengine.py
b/tests/providers/google/cloud/operators/test_mlengine.py
index e12a7e9621..af7a487f42 100644
--- a/tests/providers/google/cloud/operators/test_mlengine.py
+++ b/tests/providers/google/cloud/operators/test_mlengine.py
@@ -42,7 +42,6 @@ from airflow.providers.google.cloud.operators.mlengine import
(
)
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
-from airflow.utils.dates import days_ago
DEFAULT_DATE = timezone.datetime(2017, 6, 6)
@@ -410,7 +409,7 @@ class TestMLEngineStartTrainingJobOperator:
'imageUri': 'eu.gcr.io/test-project/test-image:test-version',
},
'task_id': 'test-training',
- 'start_date': days_ago(1),
+ 'start_date': DEFAULT_DATE,
}
request = {
'jobId': 'test_training',
diff --git a/tests/test_utils/perf/dags/perf_dag_1.py
b/tests/test_utils/perf/dags/perf_dag_1.py
index 3757c7d40e..4305ec4f2e 100644
--- a/tests/test_utils/perf/dags/perf_dag_1.py
+++ b/tests/test_utils/perf/dags/perf_dag_1.py
@@ -18,23 +18,27 @@
"""
This dag tests performance of simple bash commands executed with Airflow.
"""
-from datetime import timedelta
+import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
-from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
- 'start_date': days_ago(3),
+ 'start_date': datetime.datetime(2022, 1, 1),
}
dag = DAG(
- dag_id='perf_dag_1', default_args=args, schedule_interval='@daily',
dagrun_timeout=timedelta(minutes=60)
+ dag_id='perf_dag_1',
+ default_args=args,
+ schedule_interval='@daily',
+ dagrun_timeout=datetime.timedelta(minutes=60),
)
task_1 = BashOperator(
- task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} |
dag_run={{ dag_run }}"', dag=dag
+ task_id='perf_task_1',
+ bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+ dag=dag,
)
for i in range(2, 5):
diff --git a/tests/test_utils/perf/dags/perf_dag_2.py
b/tests/test_utils/perf/dags/perf_dag_2.py
index 208ea49b92..5b2c024864 100644
--- a/tests/test_utils/perf/dags/perf_dag_2.py
+++ b/tests/test_utils/perf/dags/perf_dag_2.py
@@ -18,23 +18,27 @@
"""
This dag tests performance of simple bash commands executed with Airflow.
"""
-from datetime import timedelta
+import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
-from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
- 'start_date': days_ago(3),
+ 'start_date': datetime.datetime(2022, 1, 1),
}
dag = DAG(
- dag_id='perf_dag_2', default_args=args, schedule_interval='@daily',
dagrun_timeout=timedelta(minutes=60)
+ dag_id='perf_dag_2',
+ default_args=args,
+ schedule_interval='@daily',
+ dagrun_timeout=datetime.timedelta(minutes=60),
)
task_1 = BashOperator(
- task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} |
dag_run={{ dag_run }}"', dag=dag
+ task_id='perf_task_1',
+ bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+ dag=dag,
)
for i in range(2, 5):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 9a65c8d621..4b47a0c9bb 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -25,7 +25,6 @@ from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
-from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.www.views import dag_edges, task_group_to_dict
from tests.models import DEFAULT_DATE
@@ -1025,7 +1024,7 @@ def test_pass_taskgroup_output_to_task():
def increment(num):
return num + 1
- @dag(schedule_interval=None, start_date=days_ago(1),
default_args={"owner": "airflow"})
+ @dag(schedule_interval=None, start_date=pendulum.DateTime(2022, 1, 1),
default_args={"owner": "airflow"})
def wrap():
total_1 = one()
assert isinstance(total_1, XComArg)
diff --git a/tests/www/views/test_views_decorators.py
b/tests/www/views/test_views_decorators.py
index c211806bd1..0e4fc12857 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -22,7 +22,7 @@ from unittest import mock
import pytest
from airflow.models import DagBag, DagRun, Log, TaskInstance
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from airflow.www import app
@@ -30,7 +30,7 @@ from airflow.www.views import action_has_dag_edit_access
from tests.test_utils.db import clear_db_runs
from tests.test_utils.www import check_content_in_response
-EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
+EXAMPLE_DAG_DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0,
second=0, microsecond=0)
@pytest.fixture(scope="module")
diff --git a/tests/www/views/test_views_extra_links.py
b/tests/www/views/test_views_extra_links.py
index b9283923f7..669499b0a4 100644
--- a/tests/www/views/test_views_extra_links.py
+++ b/tests/www/views/test_views_extra_links.py
@@ -24,7 +24,7 @@ import pytest
from airflow.models import DAG
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -157,9 +157,8 @@ def test_global_extra_links_works(dag_run, task_1,
viewer_client, session):
def test_extra_link_in_gantt_view(dag, create_dag_run, viewer_client):
- exec_date = dates.days_ago(2)
+ exec_date = timezone.datetime(2022, 1, 1)
start_date = timezone.datetime(2020, 4, 10, 2, 0, 0)
- end_date = exec_date + datetime.timedelta(seconds=30)
with create_session() as session:
dag_run = create_dag_run(execution_date=exec_date, session=session)
@@ -167,7 +166,7 @@ def test_extra_link_in_gantt_view(dag, create_dag_run,
viewer_client):
ti.refresh_from_task(dag.get_task(ti.task_id))
ti.state = TaskInstanceState.SUCCESS
ti.start_date = start_date
- ti.end_date = end_date
+ ti.end_date = start_date + datetime.timedelta(seconds=30)
session.merge(ti)
url = f'gantt?dag_id={dag.dag_id}&execution_date={exec_date}'
diff --git a/tests/www/views/test_views_task_norun.py
b/tests/www/views/test_views_task_norun.py
index 4f5dc092b3..3790148fc9 100644
--- a/tests/www/views/test_views_task_norun.py
+++ b/tests/www/views/test_views_task_norun.py
@@ -15,14 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import datetime
import urllib.parse
import pytest
-from airflow.utils import dates
from tests.test_utils.db import clear_db_runs
-DEFAULT_DATE = dates.days_ago(2)
+DEFAULT_DATE = datetime.datetime(2022, 1, 1)
DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE))
diff --git a/tests/www/views/test_views_tasks.py
b/tests/www/views/test_views_tasks.py
index ebed9ab05f..3428ac848a 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -32,7 +32,7 @@ from airflow.models.dagcode import DagCode
from airflow.operators.bash import BashOperator
from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES,
RUNNABLE_STATES
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -43,7 +43,7 @@ from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
from tests.test_utils.www import check_content_in_response,
check_content_not_in_response, client_with_login
-DEFAULT_DATE = dates.days_ago(2)
+DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0,
microsecond=0)
DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE))