This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f352ee63a5 Replaced all days_ago functions with datetime functions 
(#23237)
f352ee63a5 is described below

commit f352ee63a5d09546a7997ba8f2f8702a1ddb4af7
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon May 23 11:32:52 2022 -0400

    Replaced all days_ago functions with datetime functions (#23237)
    
    Co-authored-by: Dev232001 <[email protected]>
---
 airflow/example_dags/example_subdag_operator.py    |  5 ++--
 tests/api/common/test_delete_dag.py                |  7 ++---
 tests/api/common/test_mark_tasks.py                | 21 ++++++++------
 .../endpoints/test_extra_link_endpoint.py          | 33 +++++++++-------------
 tests/cli/commands/test_task_command.py            | 12 ++++----
 tests/dag_processing/test_processor.py             | 18 ++++++------
 tests/dags/test_default_views.py                   |  6 ++--
 tests/dags/test_example_bash_operator.py           |  9 ++----
 tests/dags/test_mapped_classic.py                  |  5 ++--
 tests/dags/test_mapped_taskflow.py                 |  5 ++--
 tests/dags/test_miscellaneous.py                   |  7 ++---
 tests/dags/test_missing_owner.py                   |  7 ++---
 tests/dags/test_multiple_dags.py                   |  7 ++---
 tests/dags/test_with_non_default_owner.py          |  7 ++---
 tests/jobs/test_backfill_job.py                    |  3 +-
 tests/jobs/test_scheduler_job.py                   |  3 +-
 tests/models/test_dagrun.py                        |  7 ++---
 tests/operators/test_python.py                     |  3 +-
 .../google/cloud/operators/test_mlengine.py        |  3 +-
 tests/test_utils/perf/dags/perf_dag_1.py           | 14 +++++----
 tests/test_utils/perf/dags/perf_dag_2.py           | 14 +++++----
 tests/utils/test_task_group.py                     |  3 +-
 tests/www/views/test_views_decorators.py           |  4 +--
 tests/www/views/test_views_extra_links.py          |  7 ++---
 tests/www/views/test_views_task_norun.py           |  4 +--
 tests/www/views/test_views_tasks.py                |  4 +--
 26 files changed, 105 insertions(+), 113 deletions(-)

diff --git a/airflow/example_dags/example_subdag_operator.py 
b/airflow/example_dags/example_subdag_operator.py
index 84a303815a..79d369d638 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -19,18 +19,19 @@
 """Example DAG demonstrating the usage of the SubDagOperator."""
 
 # [START example_subdag_operator]
+import datetime
+
 from airflow import DAG
 from airflow.example_dags.subdags.subdag import subdag
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.subdag import SubDagOperator
-from airflow.utils.dates import days_ago
 
 DAG_NAME = 'example_subdag_operator'
 
 with DAG(
     dag_id=DAG_NAME,
     default_args={"retries": 2},
-    start_date=days_ago(2),
+    start_date=datetime.datetime(2022, 1, 1),
     schedule_interval="@once",
     tags=['example'],
 ) as dag:
diff --git a/tests/api/common/test_delete_dag.py 
b/tests/api/common/test_delete_dag.py
index 2830020d29..cf51856409 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -16,14 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 import pytest
 
 from airflow import models
 from airflow.api.common.delete_dag import delete_dag
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
+from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
@@ -73,11 +72,11 @@ class TestDeleteDAGSuccessfulDelete:
 
         task = EmptyOperator(
             task_id='dummy',
-            dag=models.DAG(dag_id=self.key, default_args={'start_date': 
days_ago(2)}),
+            dag=models.DAG(dag_id=self.key, default_args={'start_date': 
timezone.datetime(2022, 1, 1)}),
             owner='airflow',
         )
 
-        test_date = days_ago(1)
+        test_date = timezone.datetime(2022, 1, 1)
         with create_session() as session:
             session.add(DM(dag_id=self.key, fileloc=self.dag_file_path, 
is_subdag=for_sub_dag))
             dr = DR(dag_id=self.key, run_type=DagRunType.MANUAL, 
run_id="test", execution_date=test_date)
diff --git a/tests/api/common/test_mark_tasks.py 
b/tests/api/common/test_mark_tasks.py
index 77064560f2..fee33a50e8 100644
--- a/tests/api/common/test_mark_tasks.py
+++ b/tests/api/common/test_mark_tasks.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import timedelta
+import datetime
 from typing import Callable
 
 import pytest
@@ -34,7 +34,6 @@ from airflow.api.common.mark_tasks import (
 )
 from airflow.models import DagRun
 from airflow.utils import timezone
-from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
@@ -62,12 +61,12 @@ class TestMarkTasks:
         cls.dag2 = dagbag.get_dag('example_subdag_operator')
         cls.dag3 = dagbag.get_dag('example_trigger_target_dag')
         cls.dag4 = dagbag.get_dag('test_mapped_classic')
-        cls.execution_dates = [days_ago(2), days_ago(1)]
+        cls.execution_dates = [timezone.datetime(2022, 1, 1), 
timezone.datetime(2022, 1, 2)]
         start_date3 = cls.dag3.start_date
         cls.dag3_execution_dates = [
             start_date3,
-            start_date3 + timedelta(days=1),
-            start_date3 + timedelta(days=2),
+            start_date3 + datetime.timedelta(days=1),
+            start_date3 + datetime.timedelta(days=2),
         ]
 
     @pytest.fixture(autouse=True)
@@ -76,7 +75,7 @@ class TestMarkTasks:
         clear_db_runs()
         drs = _create_dagruns(
             self.dag1,
-            [_DagRunInfo(d, (d, d + timedelta(days=1))) for d in 
self.execution_dates],
+            [_DagRunInfo(d, (d, d + datetime.timedelta(days=1))) for d in 
self.execution_dates],
             state=State.RUNNING,
             run_type=DagRunType.SCHEDULED,
         )
@@ -88,7 +87,7 @@ class TestMarkTasks:
             [
                 _DagRunInfo(
                     self.dag2.start_date,
-                    (self.dag2.start_date, self.dag2.start_date + 
timedelta(days=1)),
+                    (self.dag2.start_date, self.dag2.start_date + 
datetime.timedelta(days=1)),
                 ),
             ],
             state=State.RUNNING,
@@ -112,7 +111,7 @@ class TestMarkTasks:
             [
                 _DagRunInfo(
                     self.dag4.start_date,
-                    (self.dag4.start_date, self.dag4.start_date + 
timedelta(days=1)),
+                    (self.dag4.start_date, self.dag4.start_date + 
datetime.timedelta(days=1)),
                 )
             ],
             state=State.SUCCESS,
@@ -482,7 +481,11 @@ class TestMarkDAGRun:
         cls.dag1.sync_to_db()
         cls.dag2 = dagbag.dags['example_subdag_operator']
         cls.dag2.sync_to_db()
-        cls.execution_dates = [days_ago(2), days_ago(1), days_ago(0)]
+        cls.execution_dates = [
+            timezone.datetime(2022, 1, 1),
+            timezone.datetime(2022, 1, 2),
+            timezone.datetime(2022, 1, 3),
+        ]
 
     def setup_method(self):
         clear_db_runs()
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py 
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index a1eb3b8a54..c209f661a8 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -14,11 +14,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 import os
 from urllib.parse import quote_plus
 
 import pytest
-from parameterized import parameterized
 
 from airflow import DAG
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
@@ -28,8 +28,8 @@ from airflow.models.xcom import XCom
 from airflow.plugins_manager import AirflowPlugin
 from airflow.providers.google.cloud.operators.bigquery import 
BigQueryExecuteQueryOperator
 from airflow.security import permissions
+from airflow.utils import timezone
 from airflow.utils.state import DagRunState
-from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 from tests.test_utils.api_connexion_utils import create_user, delete_user
 from tests.test_utils.db import clear_db_runs, clear_db_xcom
@@ -61,7 +61,7 @@ def configured_app(minimal_app_for_api):
 class TestGetExtraLinks:
     @pytest.fixture(autouse=True)
     def setup_attrs(self, configured_app, session) -> None:
-        self.default_time = datetime(2020, 1, 1)
+        self.default_time = timezone.datetime(2020, 1, 1)
 
         clear_db_runs()
         clear_db_xcom()
@@ -90,40 +90,35 @@ class TestGetExtraLinks:
         clear_db_xcom()
 
     def _create_dag(self):
-        with DAG(
-            dag_id="TEST_DAG_ID",
-            default_args=dict(
-                start_date=self.default_time,
-            ),
-        ) as dag:
+        with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": 
self.default_time}) as dag:
             BigQueryExecuteQueryOperator(task_id="TEST_SINGLE_QUERY", 
sql="SELECT 1")
             BigQueryExecuteQueryOperator(task_id="TEST_MULTIPLE_QUERY", 
sql=["SELECT 1", "SELECT 2"])
         return dag
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "url, expected_title, expected_detail",
         [
-            (
-                "missing_dag",
+            pytest.param(
                 
"/api/v1/dags/INVALID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_SINGLE_QUERY/links",
                 "DAG not found",
                 'DAG with ID = "INVALID" not found',
+                id="missing_dag",
             ),
-            (
-                "missing_dag_run",
+            pytest.param(
                 
"/api/v1/dags/TEST_DAG_ID/dagRuns/INVALID/taskInstances/TEST_SINGLE_QUERY/links",
                 "DAG Run not found",
                 'DAG Run with ID = "INVALID" not found',
+                id="missing_dag_run",
             ),
-            (
-                "missing_task",
+            pytest.param(
                 
"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/INVALID/links",
                 "Task not found",
                 'Task with ID = "INVALID" not found',
+                id="missing_task",
             ),
-        ]
+        ],
     )
-    def test_should_respond_404(self, name, url, expected_title, 
expected_detail):
-        del name
+    def test_should_respond_404(self, url, expected_title, expected_detail):
         response = self.client.get(url, environ_overrides={'REMOTE_USER': 
"test"})
 
         assert 404 == response.status_code
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index ffcc860388..1f4662b753 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
+
 import io
 import json
 import logging
@@ -24,7 +24,6 @@ import re
 import unittest
 from argparse import ArgumentParser
 from contextlib import redirect_stdout
-from datetime import datetime
 from unittest import mock
 
 import pytest
@@ -38,14 +37,13 @@ from airflow.exceptions import AirflowException, 
DagRunNotFound
 from airflow.models import DagBag, DagRun, Pool, TaskInstance
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import timezone
-from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_pools, clear_db_runs
 
-DEFAULT_DATE = days_ago(1)
+DEFAULT_DATE = timezone.datetime(2022, 1, 1)
 ROOT_FOLDER = os.path.realpath(
     os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, 
os.pardir)
 )
@@ -374,7 +372,7 @@ class TestCliTasks(unittest.TestCase):
 
         dag2 = DagBag().dags['example_python_operator']
         task2 = dag2.get_task(task_id='print_the_context')
-        default_date2 = timezone.make_aware(datetime(2016, 1, 9))
+        default_date2 = timezone.datetime(2016, 1, 9)
         dag2.clear()
         dagrun = dag2.create_dagrun(
             state=State.RUNNING,
@@ -417,7 +415,7 @@ class TestCliTasks(unittest.TestCase):
         task_states_for_dag_run should return an AirflowException when invalid 
dag id is passed
         """
         with pytest.raises(DagRunNotFound):
-            default_date2 = timezone.make_aware(datetime(2016, 1, 9))
+            default_date2 = timezone.datetime(2016, 1, 9)
             task_command.task_states_for_dag_run(
                 self.parser.parse_args(
                     [
@@ -455,7 +453,7 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
         self.run_id = "test_run"
         self.dag_path = os.path.join(ROOT_FOLDER, "dags", 
"test_logging_in_dag.py")
         reset(self.dag_id)
-        self.execution_date = timezone.make_aware(datetime(2017, 1, 1))
+        self.execution_date = timezone.datetime(2017, 1, 1)
         self.execution_date_str = self.execution_date.isoformat()
         self.task_args = ['tasks', 'run', self.dag_id, self.task_id, 
'--local', self.execution_date_str]
         self.log_dir = conf.get_mandatory_value('logging', 'base_log_folder')
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 9b8716bded..e2f9165131 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 import datetime
 import os
 from unittest import mock
@@ -34,7 +33,6 @@ from airflow.models import DagBag, DagModel, SlaMiss, 
TaskInstance, errors
 from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.utils import timezone
-from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
@@ -113,7 +111,7 @@ class TestDagFileProcessor:
 
         # Create dag with a start of 1 day ago, but an sla of 0
         # so we'll already have an sla_miss on the books.
-        test_start_date = days_ago(1)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
@@ -142,7 +140,7 @@ class TestDagFileProcessor:
         # Create dag with a start of 1 day ago, but an sla of 0
         # so we'll already have an sla_miss on the books.
         # Pass anything besides a timedelta object to the sla argument.
-        test_start_date = days_ago(1)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
@@ -170,7 +168,7 @@ class TestDagFileProcessor:
 
         # Create dag with a start of 2 days ago, but an sla of 1 day
         # ago so we'll already have an sla_miss on the books
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
@@ -206,7 +204,7 @@ class TestDagFileProcessor:
 
         # Create dag with a start of 2 days ago, but an sla of 1 day
         # ago so we'll already have an sla_miss on the books
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
         with dag_maker(
             dag_id='test_sla_miss',
             default_args={'start_date': test_start_date, 'sla': 
datetime.timedelta(days=1)},
@@ -247,7 +245,7 @@ class TestDagFileProcessor:
 
         sla_callback = MagicMock(side_effect=RuntimeError('Could not call 
function'))
 
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
@@ -277,7 +275,7 @@ class TestDagFileProcessor:
     ):
         session = settings.Session()
 
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         email1 = '[email protected]'
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
@@ -317,7 +315,7 @@ class TestDagFileProcessor:
         # Mock the callback function so we can verify that it was not called
         mock_send_email.side_effect = RuntimeError('Could not send an email')
 
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
@@ -347,7 +345,7 @@ class TestDagFileProcessor:
         """
         session = settings.Session()
 
-        test_start_date = days_ago(2)
+        test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
             dag_id='test_sla_miss',
             task_id='dummy',
diff --git a/tests/dags/test_default_views.py b/tests/dags/test_default_views.py
index ca51c10e62..6a1aefb4ee 100644
--- a/tests/dags/test_default_views.py
+++ b/tests/dags/test_default_views.py
@@ -15,10 +15,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import pendulum
+
 from airflow.models import DAG
-from airflow.utils.dates import days_ago
 
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
+args = {'owner': 'airflow', 'retries': 3, 'start_date': 
pendulum.datetime(2022, 1, 1)}
 
 tree_dag = DAG(
     dag_id='test_tree_view',
diff --git a/tests/dags/test_example_bash_operator.py 
b/tests/dags/test_example_bash_operator.py
index 4e44f55ff6..288667ad06 100644
--- a/tests/dags/test_example_bash_operator.py
+++ b/tests/dags/test_example_bash_operator.py
@@ -15,20 +15,17 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import timedelta
+import datetime
 
 from airflow.models import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
-
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
 
 dag = DAG(
     dag_id='test_example_bash_operator',
-    default_args=args,
+    default_args={'owner': 'airflow', 'retries': 3, 'start_date': 
datetime.datetime(2022, 1, 1)},
     schedule_interval='0 0 * * *',
-    dagrun_timeout=timedelta(minutes=60),
+    dagrun_timeout=datetime.timedelta(minutes=60),
 )
 
 cmd = 'ls -l'
diff --git a/tests/dags/test_mapped_classic.py 
b/tests/dags/test_mapped_classic.py
index cbf3a8a5b8..c3e0d413be 100644
--- a/tests/dags/test_mapped_classic.py
+++ b/tests/dags/test_mapped_classic.py
@@ -15,10 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+
 from airflow import DAG
 from airflow.decorators import task
 from airflow.operators.python import PythonOperator
-from airflow.utils.dates import days_ago
 
 
 @task
@@ -30,7 +31,7 @@ def consumer(value):
     print(repr(value))
 
 
-with DAG(dag_id='test_mapped_classic', start_date=days_ago(2)) as dag:
+with DAG(dag_id='test_mapped_classic', start_date=datetime.datetime(2022, 1, 
1)) as dag:
     PythonOperator.partial(task_id='consumer', 
python_callable=consumer).expand(op_args=make_arg_lists())
     PythonOperator.partial(task_id='consumer_literal', 
python_callable=consumer).expand(
         op_args=[[1], [2], [3]],
diff --git a/tests/dags/test_mapped_taskflow.py 
b/tests/dags/test_mapped_taskflow.py
index e4e796c3e4..a803d9afbb 100644
--- a/tests/dags/test_mapped_taskflow.py
+++ b/tests/dags/test_mapped_taskflow.py
@@ -15,10 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+
 from airflow import DAG
-from airflow.utils.dates import days_ago
 
-with DAG(dag_id='test_mapped_taskflow', start_date=days_ago(2)) as dag:
+with DAG(dag_id='test_mapped_taskflow', start_date=datetime.datetime(2022, 1, 
1)) as dag:
 
     @dag.task
     def make_list():
diff --git a/tests/dags/test_miscellaneous.py b/tests/dags/test_miscellaneous.py
index 7f77340e62..2174a61478 100644
--- a/tests/dags/test_miscellaneous.py
+++ b/tests/dags/test_miscellaneous.py
@@ -18,12 +18,11 @@
 
 """Example DAG demonstrating the usage of the BashOperator."""
 
-from datetime import timedelta
+import datetime
 
 from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
 
 args = {
     'owner': 'airflow',
@@ -33,8 +32,8 @@ dag = DAG(
     dag_id='miscellaneous_test_dag',
     default_args=args,
     schedule_interval='0 0 * * *',
-    start_date=days_ago(2),
-    dagrun_timeout=timedelta(minutes=60),
+    start_date=datetime.datetime(2022, 1, 1),
+    dagrun_timeout=datetime.timedelta(minutes=60),
     tags=['example', 'example2'],
     params={"example_key": "example_value"},
 )
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
index dc70a5b757..cc20db08c6 100644
--- a/tests/dags/test_missing_owner.py
+++ b/tests/dags/test_missing_owner.py
@@ -16,17 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import timedelta
+import datetime
 
 from airflow import DAG
 from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id="test_missing_owner",
     schedule_interval="0 0 * * *",
-    start_date=days_ago(2),
-    dagrun_timeout=timedelta(minutes=60),
+    start_date=datetime.datetime(2022, 1, 1),
+    dagrun_timeout=datetime.timedelta(minutes=60),
     tags=["example"],
 ) as dag:
     run_this_last = EmptyOperator(
diff --git a/tests/dags/test_multiple_dags.py b/tests/dags/test_multiple_dags.py
index 44aa5cdfc2..67ae544123 100644
--- a/tests/dags/test_multiple_dags.py
+++ b/tests/dags/test_multiple_dags.py
@@ -15,13 +15,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import timedelta
+import datetime
 
 from airflow.models import DAG
 from airflow.operators.bash import BashOperator
-from airflow.utils.dates import days_ago
 
-args = {'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)}
+args = {'owner': 'airflow', 'retries': 3, 'start_date': 
datetime.datetime(2022, 1, 1)}
 
 
 def create_dag(suffix):
@@ -29,7 +28,7 @@ def create_dag(suffix):
         dag_id=f'test_multiple_dags__{suffix}',
         default_args=args,
         schedule_interval='0 0 * * *',
-        dagrun_timeout=timedelta(minutes=60),
+        dagrun_timeout=datetime.timedelta(minutes=60),
     )
 
     with dag:
diff --git a/tests/dags/test_with_non_default_owner.py 
b/tests/dags/test_with_non_default_owner.py
index eb7d1f3d99..43a2953f9c 100644
--- a/tests/dags/test_with_non_default_owner.py
+++ b/tests/dags/test_with_non_default_owner.py
@@ -16,17 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import timedelta
+import datetime
 
 from airflow import DAG
 from airflow.operators.empty import EmptyOperator
-from airflow.utils.dates import days_ago
 
 with DAG(
     dag_id="test_with_non_default_owner",
     schedule_interval="0 0 * * *",
-    start_date=days_ago(2),
-    dagrun_timeout=timedelta(minutes=60),
+    start_date=datetime.datetime(2022, 1, 1),
+    dagrun_timeout=datetime.timedelta(minutes=60),
     tags=["example"],
 ) as dag:
     run_this_last = EmptyOperator(
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index c358ada979..d1d461a4e0 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -23,7 +23,6 @@ import logging
 import threading
 from unittest.mock import patch
 
-import pendulum
 import pytest
 
 from airflow import settings
@@ -1600,7 +1599,7 @@ class TestBackfillJob:
         self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
         dag = self.dagbag.get_dag(dag_id)
 
-        when = pendulum.today('UTC')
+        when = datetime.datetime(2022, 1, 1)
 
         job = BackfillJob(
             dag=dag,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b50c8bfd7f..fd32e6dd7d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3955,7 +3955,6 @@ class TestSchedulerJob:
         """End-to-end test of a simple mapped dag"""
         # Use SequentialExecutor for more predictable test behaviour
         from airflow.executors.sequential_executor import SequentialExecutor
-        from airflow.utils.dates import days_ago
 
         self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
         dag = self.dagbag.get_dag(dag_id)
@@ -3964,7 +3963,7 @@ class TestSchedulerJob:
             run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             state=State.RUNNING,
-            execution_date=days_ago(2),
+            execution_date=timezone.utcnow() - datetime.timedelta(days=2),
             session=session,
         )
 
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index b53df7bf9b..f73f5d1c45 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -37,7 +37,6 @@ from airflow.operators.python import ShortCircuitOperator
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.stats import Stats
 from airflow.utils import timezone
-from airflow.utils.dates import days_ago
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import DagRunType
@@ -785,7 +784,7 @@ class TestDagRun:
         Tests that dag scheduling delay stat is not called if the dagrun is 
not a scheduled run.
         This case is manual run. Simple test for coherence check.
         """
-        dag = DAG(dag_id='test_dagrun_stats', start_date=days_ago(1))
+        dag = DAG(dag_id='test_dagrun_stats', start_date=DEFAULT_DATE)
         dag_task = EmptyOperator(task_id='dummy', dag=dag)
 
         initial_task_states = {
@@ -809,7 +808,7 @@ class TestDagRun:
         Tests that dag scheduling delay stat is set properly once running 
scheduled dag.
         dag_run.update_state() invokes the 
_emit_true_scheduling_delay_stats_for_finished_state method.
         """
-        dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), 
schedule_interval=schedule_interval)
+        dag = DAG(dag_id='test_emit_dag_stats', start_date=DEFAULT_DATE, 
schedule_interval=schedule_interval)
         dag_task = EmptyOperator(task_id='dummy', dag=dag, owner='airflow')
 
         try:
@@ -860,7 +859,7 @@ class TestDagRun:
         """
         Tests that adding State.failed_states and State.success_states work as 
expected.
         """
-        dag = DAG(dag_id='test_dagrun_states', start_date=days_ago(1))
+        dag = DAG(dag_id='test_dagrun_states', start_date=DEFAULT_DATE)
         dag_task_success = EmptyOperator(task_id='dummy', dag=dag)
         dag_task_failed = EmptyOperator(task_id='dummy2', dag=dag)
 
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index e58b424de6..a925e52fa9 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -42,7 +42,6 @@ from airflow.operators.python import (
 )
 from airflow.utils import timezone
 from airflow.utils.context import AirflowContextDeprecationWarning, Context
-from airflow.utils.dates import days_ago
 from airflow.utils.python_virtualenv import prepare_virtualenv
 from airflow.utils.session import create_session
 from airflow.utils.state import State
@@ -1161,7 +1160,7 @@ class TestPythonVirtualenvOperator(unittest.TestCase):
 DEFAULT_ARGS = {
     "owner": "test",
     "depends_on_past": True,
-    "start_date": days_ago(1),
+    "start_date": timezone.datetime(2022, 1, 1),
     "end_date": datetime.today(),
     "schedule_interval": "@once",
     "retries": 1,
diff --git a/tests/providers/google/cloud/operators/test_mlengine.py 
b/tests/providers/google/cloud/operators/test_mlengine.py
index e12a7e9621..af7a487f42 100644
--- a/tests/providers/google/cloud/operators/test_mlengine.py
+++ b/tests/providers/google/cloud/operators/test_mlengine.py
@@ -42,7 +42,6 @@ from airflow.providers.google.cloud.operators.mlengine import 
(
 )
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
-from airflow.utils.dates import days_ago
 
 DEFAULT_DATE = timezone.datetime(2017, 6, 6)
 
@@ -410,7 +409,7 @@ class TestMLEngineStartTrainingJobOperator:
                 'imageUri': 'eu.gcr.io/test-project/test-image:test-version',
             },
             'task_id': 'test-training',
-            'start_date': days_ago(1),
+            'start_date': DEFAULT_DATE,
         }
         request = {
             'jobId': 'test_training',
diff --git a/tests/test_utils/perf/dags/perf_dag_1.py 
b/tests/test_utils/perf/dags/perf_dag_1.py
index 3757c7d40e..4305ec4f2e 100644
--- a/tests/test_utils/perf/dags/perf_dag_1.py
+++ b/tests/test_utils/perf/dags/perf_dag_1.py
@@ -18,23 +18,27 @@
 """
 This dag tests performance of simple bash commands executed with Airflow.
 """
-from datetime import timedelta
+import datetime
 
 from airflow.models import DAG
 from airflow.operators.bash_operator import BashOperator
-from airflow.utils.dates import days_ago
 
 args = {
     'owner': 'airflow',
-    'start_date': days_ago(3),
+    'start_date': datetime.datetime(2022, 1, 1),
 }
 
 dag = DAG(
-    dag_id='perf_dag_1', default_args=args, schedule_interval='@daily', 
dagrun_timeout=timedelta(minutes=60)
+    dag_id='perf_dag_1',
+    default_args=args,
+    schedule_interval='@daily',
+    dagrun_timeout=datetime.timedelta(minutes=60),
 )
 
 task_1 = BashOperator(
-    task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} | 
dag_run={{ dag_run }}"', dag=dag
+    task_id='perf_task_1',
+    bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+    dag=dag,
 )
 
 for i in range(2, 5):
diff --git a/tests/test_utils/perf/dags/perf_dag_2.py 
b/tests/test_utils/perf/dags/perf_dag_2.py
index 208ea49b92..5b2c024864 100644
--- a/tests/test_utils/perf/dags/perf_dag_2.py
+++ b/tests/test_utils/perf/dags/perf_dag_2.py
@@ -18,23 +18,27 @@
 """
 This dag tests performance of simple bash commands executed with Airflow.
 """
-from datetime import timedelta
+import datetime
 
 from airflow.models import DAG
 from airflow.operators.bash import BashOperator
-from airflow.utils.dates import days_ago
 
 args = {
     'owner': 'airflow',
-    'start_date': days_ago(3),
+    'start_date': datetime.datetime(2022, 1, 1),
 }
 
 dag = DAG(
-    dag_id='perf_dag_2', default_args=args, schedule_interval='@daily', 
dagrun_timeout=timedelta(minutes=60)
+    dag_id='perf_dag_2',
+    default_args=args,
+    schedule_interval='@daily',
+    dagrun_timeout=datetime.timedelta(minutes=60),
 )
 
 task_1 = BashOperator(
-    task_id='perf_task_1', bash_command='sleep 5; echo "run_id={{ run_id }} | 
dag_run={{ dag_run }}"', dag=dag
+    task_id='perf_task_1',
+    bash_command='sleep 5; echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
+    dag=dag,
 )
 
 for i in range(2, 5):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 9a65c8d621..4b47a0c9bb 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -25,7 +25,6 @@ from airflow.models.xcom_arg import XComArg
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
-from airflow.utils.dates import days_ago
 from airflow.utils.task_group import TaskGroup
 from airflow.www.views import dag_edges, task_group_to_dict
 from tests.models import DEFAULT_DATE
@@ -1025,7 +1024,7 @@ def test_pass_taskgroup_output_to_task():
     def increment(num):
         return num + 1
 
-    @dag(schedule_interval=None, start_date=days_ago(1), 
default_args={"owner": "airflow"})
+    @dag(schedule_interval=None, start_date=pendulum.DateTime(2022, 1, 1), 
default_args={"owner": "airflow"})
     def wrap():
         total_1 = one()
         assert isinstance(total_1, XComArg)
diff --git a/tests/www/views/test_views_decorators.py 
b/tests/www/views/test_views_decorators.py
index c211806bd1..0e4fc12857 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -22,7 +22,7 @@ from unittest import mock
 import pytest
 
 from airflow.models import DagBag, DagRun, Log, TaskInstance
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 from airflow.www import app
@@ -30,7 +30,7 @@ from airflow.www.views import action_has_dag_edit_access
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.www import check_content_in_response
 
-EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
+EXAMPLE_DAG_DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, 
second=0, microsecond=0)
 
 
 @pytest.fixture(scope="module")
diff --git a/tests/www/views/test_views_extra_links.py 
b/tests/www/views/test_views_extra_links.py
index b9283923f7..669499b0a4 100644
--- a/tests/www/views/test_views_extra_links.py
+++ b/tests/www/views/test_views_extra_links.py
@@ -24,7 +24,7 @@ import pytest
 
 from airflow.models import DAG
 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
@@ -157,9 +157,8 @@ def test_global_extra_links_works(dag_run, task_1, 
viewer_client, session):
 
 
 def test_extra_link_in_gantt_view(dag, create_dag_run, viewer_client):
-    exec_date = dates.days_ago(2)
+    exec_date = timezone.datetime(2022, 1, 1)
     start_date = timezone.datetime(2020, 4, 10, 2, 0, 0)
-    end_date = exec_date + datetime.timedelta(seconds=30)
 
     with create_session() as session:
         dag_run = create_dag_run(execution_date=exec_date, session=session)
@@ -167,7 +166,7 @@ def test_extra_link_in_gantt_view(dag, create_dag_run, 
viewer_client):
             ti.refresh_from_task(dag.get_task(ti.task_id))
             ti.state = TaskInstanceState.SUCCESS
             ti.start_date = start_date
-            ti.end_date = end_date
+            ti.end_date = start_date + datetime.timedelta(seconds=30)
             session.merge(ti)
 
     url = f'gantt?dag_id={dag.dag_id}&execution_date={exec_date}'
diff --git a/tests/www/views/test_views_task_norun.py 
b/tests/www/views/test_views_task_norun.py
index 4f5dc092b3..3790148fc9 100644
--- a/tests/www/views/test_views_task_norun.py
+++ b/tests/www/views/test_views_task_norun.py
@@ -15,14 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import datetime
 import urllib.parse
 
 import pytest
 
-from airflow.utils import dates
 from tests.test_utils.db import clear_db_runs
 
-DEFAULT_DATE = dates.days_ago(2)
+DEFAULT_DATE = datetime.datetime(2022, 1, 1)
 
 DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE))
 
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index ebed9ab05f..3428ac848a 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -32,7 +32,7 @@ from airflow.models.dagcode import DagCode
 from airflow.operators.bash import BashOperator
 from airflow.security import permissions
 from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, 
RUNNABLE_STATES
-from airflow.utils import dates, timezone
+from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import create_session
 from airflow.utils.state import State
@@ -43,7 +43,7 @@ from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.www import check_content_in_response, 
check_content_not_in_response, client_with_login
 
-DEFAULT_DATE = dates.days_ago(2)
+DEFAULT_DATE = timezone.utcnow().replace(hour=0, minute=0, second=0, 
microsecond=0)
 
 DEFAULT_VAL = urllib.parse.quote_plus(str(DEFAULT_DATE))
 

Reply via email to