This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 302efad  Also track task_log_prefix_template changes (#20435)
302efad is described below

commit 302efad167a54f05642ff163c1319b40d2196970
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Dec 21 20:26:57 2021 +0800

    Also track task_log_prefix_template changes (#20435)
---
 UPDATING.md                                        |   6 +-
 ...62e7089_add_task_log_filename_template_model.py |  15 +--
 airflow/models/dagrun.py                           |  35 +++++--
 airflow/models/tasklog.py                          |  12 +--
 airflow/utils/cli.py                               |   4 +-
 airflow/utils/db.py                                |  21 ++--
 airflow/utils/helpers.py                           |   2 +-
 .../log/task_handler_with_custom_formatter.py      |  30 +++---
 docs/apache-airflow/migrations-ref.rst             |   3 +-
 .../test_task_handler_with_custom_formatter.py     | 114 ++++++++++++++-------
 tests/www/views/test_views_log.py                  |  12 +--
 11 files changed, 157 insertions(+), 97 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 36c5488..f076451 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -89,11 +89,11 @@ Smart sensors, an "early access" feature added in Airflow 
2, are now deprecated
 
 See [Migrating to Deferrable 
Operators](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html#migrating-to-deferrable-operators)
 for details on how to migrate.
 
-### Task log filenames are not rendered from database entry instead of config 
value
+### Task log templates are now read from the metadatabase instead of 
`airflow.cfg`
 
-Previously, filename of a task’s log is dynamically rendered from the ``[core] 
log_filename_template`` config value at runtime. This resulted in unfortunate 
characteristics like it is inpractical to modify the config value after an 
Airflow instance is running for a while, since all existing task logs have be 
saved under the previous format and cannot be found with the new config value.
+Previously, a task’s log is dynamically rendered from the `[core] 
log_filename_template` and `[core] task_log_prefix_template` config values at 
runtime. This resulted in unfortunate characteristics, e.g. it is impractical 
to modify the config value after an Airflow instance is running for a while, 
since all existing task logs have be saved under the previous format and cannot 
be found with the new config value.
 
-A new `log_filename` table is introduced to solve this problem. This table is 
synchronised with the aforementioned config value every time Airflow starts, 
and a new field `log_filename_id` is added to every DAG run to point to the 
format used by tasks (`NULL` indicates the first ever entry for compatibility).
+A new `log_template` table is introduced to solve this problem. This table is 
synchronised with the aforementioned config values every time Airflow starts, 
and a new field `log_template_id` is added to every DAG run to point to the 
format used by tasks (`NULL` indicates the first ever entry for compatibility).
 
 ## Airflow 2.2.2
 
diff --git 
a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
 
b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
index 21d5bd3..1ed339d 100644
--- 
a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
+++ 
b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
@@ -36,17 +36,18 @@ depends_on = None
 
 
 def upgrade():
-    """Add model for task log filename template and establish fk on task 
instance."""
+    """Add model for task log template and establish fk on task instance."""
     op.create_table(
-        "log_filename",
+        "log_template",
         Column("id", Integer, primary_key=True, autoincrement=True),
-        Column("template", Text, nullable=False),
+        Column("filename", Text, nullable=False),
+        Column("task_prefix", Text, nullable=False),
         Column("created_at", UtcDateTime, nullable=False),
     )
     dag_run_log_filename_id = Column(
-        "log_filename_id",
+        "log_template_id",
         Integer,
-        ForeignKey("log_filename.id", 
name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
+        ForeignKey("log_template.id", 
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
     )
     with op.batch_alter_table("dag_run") as batch_op:
         batch_op.add_column(dag_run_log_filename_id)
@@ -55,5 +56,5 @@ def upgrade():
 def downgrade():
     """Remove fk on task instance and model for task log filename template."""
     with op.batch_alter_table("dag_run") as batch_op:
-        batch_op.drop_column("log_filename_id")
-    op.drop_table("log_filename")
+        batch_op.drop_column("log_template_id")
+    op.drop_table("log_template")
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index eb6001e..4b8cb46 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -45,7 +45,7 @@ from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
 from airflow.models.taskinstance import TaskInstance as TI
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
@@ -96,13 +96,13 @@ class DagRun(Base, LoggingMixin):
     # When a scheduler last attempted to schedule TIs for this DagRun
     last_scheduling_decision = Column(UtcDateTime)
     dag_hash = Column(String(32))
-    # Foreign key to LogFilename. DagRun rows created prior to this column's
+    # Foreign key to LogTemplate. DagRun rows created prior to this column's
     # existence have this set to NULL. Later rows automatically populate this 
on
-    # insert to point to the latest LogFilename entry.
-    log_filename_id = Column(
+    # insert to point to the latest LogTemplate entry.
+    log_template_id = Column(
         Integer,
-        ForeignKey("log_filename.id", 
name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
-        default=select([func.max(LogFilename.__table__.c.id)]),
+        ForeignKey("log_template.id", 
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
+        default=select([func.max(LogTemplate.__table__.c.id)]),
     )
 
     # Remove this `if` after upgrading Sphinx-AutoAPI
@@ -939,14 +939,27 @@ class DagRun(Base, LoggingMixin):
         return count
 
     @provide_session
-    def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> 
Optional[str]:
-        if self.log_filename_id is None:  # DagRun created before LogFilename 
introduction.
-            template = 
session.query(LogFilename.template).order_by(LogFilename.id).limit(1).scalar()
+    def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> 
str:
+        if self.log_template_id is None:  # DagRun created before LogTemplate 
introduction.
+            template = 
session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
         else:
-            template = 
session.query(LogFilename.template).filter_by(id=self.log_filename_id).scalar()
+            template = 
session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
         if template is None:
             raise AirflowException(
-                f"No log_filename entry found for ID {self.log_filename_id!r}. 
"
+                f"No log_template entry found for ID {self.log_template_id!r}. 
"
+                f"Please make sure you set up the metadatabase correctly."
+            )
+        return template
+
+    @provide_session
+    def get_task_prefix_template(self, *, session: Session = NEW_SESSION) -> 
str:
+        if self.log_template_id is None:  # DagRun created before LogTemplate 
introduction.
+            template = 
session.query(LogTemplate.task_prefix).order_by(LogTemplate.id).limit(1).scalar()
+        else:
+            template = 
session.query(LogTemplate.task_prefix).filter_by(id=self.log_template_id).scalar()
+        if template is None:
+            raise AirflowException(
+                f"No log_template entry found for ID {self.log_template_id!r}. 
"
                 f"Please make sure you set up the metadatabase correctly."
             )
         return template
diff --git a/airflow/models/tasklog.py b/airflow/models/tasklog.py
index 7b660cb..bfbac42 100644
--- a/airflow/models/tasklog.py
+++ b/airflow/models/tasklog.py
@@ -23,19 +23,19 @@ from airflow.utils import timezone
 from airflow.utils.sqlalchemy import UtcDateTime
 
 
-class LogFilename(Base):
-    """Model to store ``[core] log_filename_template`` config changes.
+class LogTemplate(Base):
+    """Changes to ``log_filename_template`` and ``task_log_prefix_template``.
 
     This table is automatically populated when Airflow starts up, to store the
     config's value if it does not match the last row in the table.
     """
 
-    __tablename__ = "log_filename"
+    __tablename__ = "log_template"
 
     id = Column(Integer, primary_key=True, autoincrement=True)
-    template = Column(Text, nullable=False)
+    filename = Column(Text, nullable=False)
+    task_prefix = Column(Text, nullable=False)
     created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
 
     def __repr__(self) -> str:
-        created_at = self.created_at.isoformat()
-        return f"LogFilename(id={self.id!r}, template={self.template!r}, 
created_at={created_at!r})"
+        return f"LogTemplate(filename={self.filename!r}, 
task_prefix={self.task_prefix!r})"
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 351862c..db67999 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -34,7 +34,7 @@ from typing import TYPE_CHECKING, Callable, Optional, 
TypeVar, cast
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.utils import cli_action_loggers
-from airflow.utils.db import check_and_run_migrations, 
synchronize_log_filename_template
+from airflow.utils.db import check_and_run_migrations, synchronize_log_template
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.platform import getuser, is_terminal_support_colors
 from airflow.utils.session import provide_session
@@ -94,7 +94,7 @@ def action_cli(func=None, check_db=True):
                 # Check and run migrations if necessary
                 if check_db:
                     check_and_run_migrations()
-                    synchronize_log_filename_template()
+                    synchronize_log_template()
                 return f(*args, **kwargs)
             except Exception as e:
                 metrics['error'] = e
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 827654f..e2ad98b 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -55,7 +55,7 @@ from airflow.models import (  # noqa: F401
 
 # We need to add this model manually to get reset working well
 from airflow.models.serialized_dag import SerializedDagModel  # noqa: F401
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
 from airflow.utils import helpers
 
 # TODO: remove create_session once we decide to break backward compatibility
@@ -722,17 +722,18 @@ def check_and_run_migrations():
 
 
 @provide_session
-def synchronize_log_filename_template(*, session: Session = NEW_SESSION) -> 
None:
-    """Synchronize log filename template config with table.
+def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
+    """Synchronize log template configs with table.
 
-    This checks if the last row (based on timestamp) matches the current
-    config value, and insert a new row if not.
+    This checks if the last row fully matches the current config values, and
+    insert a new row if not.
     """
-    stored = 
session.query(LogFilename.template).order_by(LogFilename.id.desc()).limit(1).scalar()
-    config = conf.get("logging", "LOG_FILENAME_TEMPLATE")
-    if stored == config:
+    stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
+    filename = conf.get("logging", "log_filename_template")
+    prefix = conf.get("logging", "task_log_prefix_template")
+    if stored and stored.filename == filename and stored.task_prefix == prefix:
         return
-    session.merge(LogFilename(template=config))
+    session.merge(LogTemplate(filename=filename, task_prefix=prefix))
 
 
 def check_conn_id_duplicates(session: Session) -> Iterable[str]:
@@ -1011,7 +1012,7 @@ def upgradedb(session: Session = NEW_SESSION):
         log.info("Creating tables")
         command.upgrade(config, 'heads')
     add_default_pool_if_not_exists()
-    synchronize_log_filename_template()
+    synchronize_log_template()
 
 
 @provide_session
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 611c2e4..1ff57ef 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -171,7 +171,7 @@ def as_flattened_list(iterable: Iterable[Iterable[T]]) -> 
List[T]:
     return [e for i in iterable for e in i]
 
 
-def parse_template_string(template_string):
+def parse_template_string(template_string: str) -> Tuple[Optional[str], 
Optional[jinja2.Template]]:
     """Parses Jinja template string."""
     if "{{" in template_string:  # jinja mode
         return None, jinja2.Template(template_string)
diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py 
b/airflow/utils/log/task_handler_with_custom_formatter.py
index b7b431b..6227d42 100644
--- a/airflow/utils/log/task_handler_with_custom_formatter.py
+++ b/airflow/utils/log/task_handler_with_custom_formatter.py
@@ -17,39 +17,45 @@
 """Custom logging formatter for Airflow"""
 
 import logging
-from logging import StreamHandler
+from typing import TYPE_CHECKING, Optional
 
-from airflow.configuration import conf
 from airflow.utils.helpers import parse_template_string, 
render_template_to_string
+from airflow.utils.session import NEW_SESSION, provide_session
 
+if TYPE_CHECKING:
+    from jinja2 import Template
+    from sqlalchemy.orm import Session
 
-class TaskHandlerWithCustomFormatter(StreamHandler):
+    from airflow.models.taskinstance import TaskInstance
+
+
+class TaskHandlerWithCustomFormatter(logging.StreamHandler):
     """Custom implementation of StreamHandler, a class which writes logging 
records for Airflow"""
 
-    def __init__(self, stream):
-        super().__init__()
-        self.prefix_jinja_template = None
+    prefix_jinja_template: Optional["Template"] = None
 
-    def set_context(self, ti):
+    @provide_session
+    def set_context(self, ti, *, session: "Session" = NEW_SESSION) -> None:
         """
         Accept the run-time context (i.e. the current task) and configure the 
formatter accordingly.
 
         :param ti:
         :return:
         """
-        if ti.raw:
+        if ti.raw or self.formatter is None:
             return
-        prefix = conf.get('logging', 'task_log_prefix_template')
+        prefix = ti.get_dagrun().get_task_prefix_template(session=session)
 
-        rendered_prefix = ""
         if prefix:
             _, self.prefix_jinja_template = parse_template_string(prefix)
             rendered_prefix = self._render_prefix(ti)
-        formatter = logging.Formatter(rendered_prefix + ":" + 
self.formatter._fmt)
+        else:
+            rendered_prefix = ""
+        formatter = 
logging.Formatter(f"{rendered_prefix}:{self.formatter._fmt}")
         self.setFormatter(formatter)
         self.setLevel(self.level)
 
-    def _render_prefix(self, ti):
+    def _render_prefix(self, ti: "TaskInstance") -> str:
         if self.prefix_jinja_template:
             jinja_context = ti.get_template_context()
             return render_template_to_string(self.prefix_jinja_template, 
jinja_context)
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 9c5f4d0..e73f29e 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,8 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``f9da662e7089`` (head)        | ``786e3737b18f`` | ``2.3.0``       | Add 
``LogFilename`` table to track ``log_filename_template`` value changes.         
  |
+| ``f9da662e7089`` (head)        | ``786e3737b18f`` | ``2.3.0``       | Add 
``LogTemplate`` table to track changes to config values 
``log_filename_template`` |
+|                                |                  |                 | and 
``task_log_prefix_template``.                                                   
  |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``786e3737b18f``               | ``5e3ec427fdd3`` | ``2.3.0``       | Add 
``timetable_description`` column to DagModel for UI.                            
  |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/utils/test_task_handler_with_custom_formatter.py 
b/tests/utils/test_task_handler_with_custom_formatter.py
index e2a3c77..23fc299 100644
--- a/tests/utils/test_task_handler_with_custom_formatter.py
+++ b/tests/utils/test_task_handler_with_custom_formatter.py
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 import logging
-import unittest
+
+import pytest
 
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
-from airflow.models import DAG, TaskInstance
+from airflow.models import DAG, DagRun, TaskInstance
+from airflow.models.tasklog import LogTemplate
 from airflow.operators.dummy import DummyOperator
 from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
@@ -29,44 +32,79 @@ from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
 
 DEFAULT_DATE = datetime(2019, 1, 1)
-TASK_LOGGER = 'airflow.task'
 TASK_HANDLER = 'task'
 TASK_HANDLER_CLASS = 
'airflow.utils.log.task_handler_with_custom_formatter.TaskHandlerWithCustomFormatter'
 PREV_TASK_HANDLER = DEFAULT_LOGGING_CONFIG['handlers']['task']
 
+DAG_ID = "task_handler_with_custom_formatter_dag"
+TASK_ID = "task_handler_with_custom_formatter_task"
+
+
[email protected](scope="module", autouse=True)
+def custom_task_log_handler_config():
+    DEFAULT_LOGGING_CONFIG['handlers']['task'] = {
+        'class': TASK_HANDLER_CLASS,
+        'formatter': 'airflow',
+        'stream': 'sys.stdout',
+    }
+    logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+    logging.root.disabled = False
+    yield
+    DEFAULT_LOGGING_CONFIG['handlers']['task'] = PREV_TASK_HANDLER
+    logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+
[email protected]()
+def task_instance():
+    dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+    task = DummyOperator(task_id=TASK_ID, dag=dag)
+    dagrun = dag.create_dagrun(DagRunState.RUNNING, 
execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL)
+    ti = TaskInstance(task=task, run_id=dagrun.run_id)
+    ti.log.disabled = False
+    yield ti
+    clear_db_runs()
+
+
[email protected]()
+def custom_prefix_template(task_instance):
+    run_filters = [DagRun.dag_id == DAG_ID, DagRun.execution_date == 
DEFAULT_DATE]
+    custom_prefix_template = "{{ ti.dag_id }}-{{ ti.task_id }}"
+    with create_session() as session:
+        log_template = session.merge(LogTemplate(filename="irrelevant", 
task_prefix=custom_prefix_template))
+        session.flush()  # To populate 'log_template.id'.
+        session.query(DagRun).filter(*run_filters).update({"log_template_id": 
log_template.id})
+    yield custom_prefix_template
+    with create_session() as session:
+        session.query(DagRun).filter(*run_filters).update({"log_template_id": 
None})
+        session.query(LogTemplate).filter(LogTemplate.id == 
log_template.id).delete()
+
+
+def assert_prefix(task_instance: TaskInstance, prefix: str) -> None:
+    handler = next((h for h in task_instance.log.handlers if h.name == 
TASK_HANDLER), None)
+    assert handler is not None, "custom task log handler not set up correctly"
+    assert handler.formatter is not None, "custom task log formatter not set 
up correctly"
+    expected_format = f"{prefix}:{handler.formatter._fmt}"
+    set_context(task_instance.log, task_instance)
+    assert expected_format == handler.formatter._fmt
+
+
+def test_custom_formatter_default_format(task_instance):
+    """The default format provides no prefix."""
+    assert_prefix(task_instance, "")
+
+
+@conf_vars({("logging", "task_log_prefix_template"): "this is wrong"})
+def test_custom_formatter_default_format_not_affected_by_config(task_instance):
+    assert_prefix(task_instance, "")
+
+
[email protected]("custom_prefix_template")
+def test_custom_formatter_custom_format(task_instance):
+    """Use the prefix specified from the metadatabase."""
+    assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
+
 
-class TestTaskHandlerWithCustomFormatter(unittest.TestCase):
-    def setUp(self):
-        DEFAULT_LOGGING_CONFIG['handlers']['task'] = {
-            'class': TASK_HANDLER_CLASS,
-            'formatter': 'airflow',
-            'stream': 'sys.stdout',
-        }
-
-        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
-        logging.root.disabled = False
-
-    def tearDown(self):
-        clear_db_runs()
-        DEFAULT_LOGGING_CONFIG['handlers']['task'] = PREV_TASK_HANDLER
-
-    @conf_vars({('logging', 'task_log_prefix_template'): 
"{{ti.dag_id}}-{{ti.task_id}}"})
-    def test_formatter(self):
-        dag = DAG('test_dag', start_date=DEFAULT_DATE)
-        task = DummyOperator(task_id='test_task', dag=dag)
-        dagrun = dag.create_dagrun(
-            DagRunState.RUNNING,
-            execution_date=DEFAULT_DATE,
-            run_type=DagRunType.MANUAL,
-        )
-        ti = TaskInstance(task=task, run_id=dagrun.run_id)
-
-        logger = ti.log
-        ti.log.disabled = False
-        handler = next((handler for handler in logger.handlers if handler.name 
== TASK_HANDLER), None)
-        assert handler is not None
-
-        # setting the expected value of the formatter
-        expected_formatter_value = "test_dag-test_task:" + 
handler.formatter._fmt
-        set_context(logger, ti)
-        assert expected_formatter_value == handler.formatter._fmt
[email protected]("custom_prefix_template")
+@conf_vars({("logging", "task_log_prefix_template"): "this is wrong"})
+def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
+    assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
diff --git a/tests/www/views/test_views_log.py 
b/tests/www/views/test_views_log.py
index cafd683..6dc4580 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -28,7 +28,7 @@ import pytest
 from airflow import settings
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.models import DagBag, DagRun
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import create_session
@@ -263,15 +263,15 @@ def 
test_get_logs_for_changed_filename_format_config(log_admin_client):
 def dag_run_with_log_filename():
     run_filters = [DagRun.dag_id == DAG_ID, DagRun.execution_date == 
DEFAULT_DATE]
     with create_session() as session:
-        log_filename = 
session.merge(LogFilename(template=DIFFERENT_LOG_FILENAME))
-        session.flush()  # To populate 'log_filename.id'.
+        log_template = 
session.merge(LogTemplate(filename=DIFFERENT_LOG_FILENAME, 
task_prefix="irrelevant"))
+        session.flush()  # To populate 'log_template.id'.
         run_query = session.query(DagRun).filter(*run_filters)
-        run_query.update({"log_filename_id": log_filename.id})
+        run_query.update({"log_template_id": log_template.id})
         dag_run = run_query.one()
     yield dag_run
     with create_session() as session:
-        session.query(DagRun).filter(*run_filters).update({"log_filename_id": 
None})
-        session.query(LogFilename).filter(LogFilename.id == 
log_filename.id).delete()
+        session.query(DagRun).filter(*run_filters).update({"log_template_id": 
None})
+        session.query(LogTemplate).filter(LogTemplate.id == 
log_template.id).delete()
 
 
 def test_get_logs_for_changed_filename_format_db(log_admin_client, 
dag_run_with_log_filename):

Reply via email to