This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cad63ce86af Add audit log if target dag partition mapper is wrong
(#62509)
cad63ce86af is described below
commit cad63ce86af1b87a3d4631e03707b6999041b4cc
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 19 10:53:42 2026 +0800
Add audit log if target dag partition mapper is wrong (#62509)
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow-core/src/airflow/assets/manager.py | 66 +++++++++++++----
airflow-core/tests/unit/assets/test_manager.py | 1 -
airflow-core/tests/unit/jobs/test_scheduler_job.py | 83 +++++++++++++++++++++-
3 files changed, 136 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 9c287209172..a599e79018e 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -41,6 +41,7 @@ from airflow.models.asset import (
DagScheduleAssetUriReference,
PartitionedAssetKeyLog,
)
+from airflow.models.log import Log
from airflow.utils.helpers import is_container
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
@@ -201,11 +202,7 @@ class AssetManager(LoggingMixin):
)
)
if not asset_model:
- msg = f"AssetModel {asset} not found; cannot create asset event."
- cls.logger().warning(msg)
- # if there is a task_instance, write to task log
- if task_instance is not None and hasattr(task_instance, "log"):
- task_instance.log.warning(msg)
+ cls.logger().warning("AssetModel %s not found; cannot create asset
event.", asset)
return None
if not asset_model.active:
@@ -297,6 +294,7 @@ class AssetManager(LoggingMixin):
dags_to_queue=dags_to_queue,
partition_key=partition_key,
event=asset_event,
+ task_instance=task_instance,
session=session,
)
return asset_event
@@ -341,10 +339,10 @@ class AssetManager(LoggingMixin):
dags_to_queue: set[DagModel],
partition_key: str | None,
event: AssetEvent,
+ task_instance: TaskInstance | None,
session: Session,
) -> None:
- log.debug("dags to queue", dags_to_queue=dags_to_queue)
-
+ log.debug("Dags to queue", dags_to_queue=dags_to_queue)
if not dags_to_queue:
return None
@@ -354,6 +352,7 @@ class AssetManager(LoggingMixin):
partition_dags=partition_dags,
event=event,
partition_key=partition_key,
+ task_instance=task_instance,
session=session,
)
@@ -376,22 +375,38 @@ class AssetManager(LoggingMixin):
@classmethod
def _queue_partitioned_dags(
cls,
+ *,
asset_id: int,
partition_dags: Iterable[DagModel],
event: AssetEvent,
partition_key: str | None,
+ task_instance: TaskInstance | None,
session: Session,
) -> None:
if partition_dags and not partition_key:
- # TODO: AIP-76 how to best ensure users can see this? Probably add
Log record.
+ prefix = "Listening Dags are partition-aware but the run has no
partition key"
log.warning(
- "Listening Dags are partition-aware but run has no partition
key",
+ prefix,
listening_dags=[x.dag_id for x in partition_dags],
asset_id=asset_id,
run_id=event.source_run_id,
dag_id=event.source_dag_id,
task_id=event.source_task_id,
)
+ msg = (
+ f"{prefix} (listening_dags={[x.dag_id for x in
partition_dags]}, "
+ f"asset_id={asset_id}, "
+ f"run_id={event.source_run_id}, "
+ f"dag_id={event.source_dag_id}, "
+ f"task_id={event.source_task_id})"
+ )
+ session.add(
+ Log(
+ event="missing partition key",
+ extra=msg,
+ task_instance=task_instance,
+ )
+ )
return
for target_dag in partition_dags:
@@ -409,9 +424,36 @@ class AssetManager(LoggingMixin):
if (asset_model :=
session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None:
raise RuntimeError(f"Could not find asset for
asset_id={asset_id}")
- target_key = timetable.get_partition_mapper(
- name=asset_model.name, uri=asset_model.uri
- ).to_downstream(partition_key)
+ try:
+ # We'll need to catch every possible exception happen when
mapping partition_key.
+ target_key = timetable.get_partition_mapper(
+ name=asset_model.name, uri=asset_model.uri
+ ).to_downstream(partition_key)
+ except Exception as err:
+ log.exception(
+ "Could not map partition key for asset in target Dag. "
+ "This likely indicates the target Dag's partition mapper "
+ "is misconfigured, or does not support this partition
key.",
+ partition_key=partition_key,
+ asset=asset_model,
+ target_dag=target_dag,
+ )
+ log_extra = (
+ f"Could not map partition_key '{partition_key}' for asset "
+ f"(name='{asset_model.name}', uri='{asset_model.uri}') in
target Dag "
+ f"'{target_dag.dag_id}'. This likely indicates that the
partition "
+ f"mapper in the target Dag is misconfigured or does not
support this "
+ f"partition key.\n{type(err).__name__}: {err}"
+ )
+ session.add(
+ Log(
+ event="failed to map partition_key",
+ extra=log_extra,
+ task_instance=task_instance,
+ )
+ )
+ continue
+
if is_container(target_key):
# TODO (AIP-76): This never happens now. When we implement
# one-to-many partition key mapping, this should also add a
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index a3402e8e0ec..82477042d86 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -90,7 +90,6 @@ class TestAssetManager:
# AssetDagRunQueue rows
mock_session.add.assert_not_called()
mock_session.merge.assert_not_called()
- mock_task_instance.log.warning.assert_called()
@pytest.mark.usefixtures("dag_maker", "testing_dag_bundle")
def test_register_asset_change(self, session, mock_task_instance):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 4511c333689..23fbecbf73a 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -85,7 +85,16 @@ from airflow.partition_mappers.base import PartitionMapper
as CorePartitionMappe
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher, IdentityMapper,
task
+from airflow.sdk import (
+ DAG,
+ Asset,
+ AssetAlias,
+ AssetWatcher,
+ CronPartitionTimetable,
+ HourlyMapper,
+ IdentityMapper,
+ task,
+)
from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
from airflow.serialization.definitions.dag import SerializedDAG
@@ -8873,6 +8882,78 @@ def _produce_and_register_asset_event(
return apdr
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker,
session: Session):
+ session.execute(delete(Log))
+ asset_1 = Asset(name="asset-1")
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=HourlyMapper(),
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ with dag_maker(
+ dag_id="asset-event-producer",
+ schedule=CronPartitionTimetable("* * * * *", timezone="UTC"),
+ session=session,
+ ) as dag:
+ EmptyOperator(task_id="hi", outlets=[asset_1])
+
+ partition_key = "an invalid key for HourlyMapper"
+ dr = dag_maker.create_dagrun(partition_key=partition_key, session=session)
+ [ti] = dr.get_task_instances(session=session)
+ session.commit()
+
+ serialized_outlets = dag.get_task("hi").outlets
+ TaskInstance.register_asset_changes_in_db(
+ ti=ti,
+ task_outlets=[o.asprofile() for o in serialized_outlets],
+ outlet_events=[],
+ session=session,
+ )
+ session.commit()
+ event = session.scalar(
+ select(AssetEvent).where(
+ AssetEvent.source_dag_id == dag.dag_id,
+ AssetEvent.source_run_id == dr.run_id,
+ )
+ )
+ assert event is not None
+ assert event.partition_key == partition_key
+ apdr = session.scalar(
+ select(AssetPartitionDagRun)
+ .join(
+ PartitionedAssetKeyLog,
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
AssetPartitionDagRun.id,
+ )
+ .where(PartitionedAssetKeyLog.asset_event_id == event.id)
+ )
+ assert apdr is None
+
+ partition_dags =
runner._create_dagruns_for_partitioned_asset_dags(session=session)
+ assert len(partition_dags) == 0
+ assert partition_dags == set()
+
+ audit_log = session.scalar(select(Log))
+ assert audit_log is not None
+ assert audit_log.extra == (
+ "Could not map partition_key 'an invalid key for HourlyMapper' "
+ "for asset (name='asset-1', uri='asset-1') in target Dag
'asset-event-consumer'. "
+ "This likely indicates that the partition mapper in the target Dag is
misconfigured or "
+ "does not support this partition key.\n"
+ "ValueError: time data 'an invalid key for HourlyMapper' does not
match format '%Y-%m-%dT%H:%M:%S'"
+ )
+
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
def test_partitioned_dag_run_with_customized_mapper(