This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 77c38f357c3 docs(asset-partition): add example Dag (#62491)
77c38f357c3 is described below
commit 77c38f357c3ff7af50ceda862f837dab7a93f359
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 3 20:55:32 2026 +0800
docs(asset-partition): add example Dag (#62491)
---
.../example_dags/example_asset_partition.py | 139 +++++++++++++++++++++
1 file changed, 139 insertions(+)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
new file mode 100644
index 00000000000..e744a2dc1ce
--- /dev/null
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.sdk import (
+ DAG,
+ Asset,
+ CronPartitionTimetable,
+ HourlyMapper,
+ PartitionedAssetTimetable,
+ YearlyMapper,
+ asset,
+ task,
+)
+
+team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv",
name="team_a_player_stats")
+combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv",
name="combined_player_stats")
+
+
+with DAG(
+ dag_id="ingest_team_a_player_stats",
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ tags=["player-stats", "ingestion"],
+):
+ """Produce hourly partitioned stats for Team A."""
+
+ @task(outlets=[team_a_player_stats])
+ def ingest_team_a_stats():
+ """Materialize Team A player statistics for the current hourly
partition."""
+ pass
+
+ ingest_team_a_stats()
+
+
+@asset(
+ uri="file://incoming/player-stats/team_b.csv",
+ schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
+ tags=["player-stats", "ingestion"],
+)
+def team_b_player_stats():
+ """Produce hourly partitioned stats for Team B."""
+ pass
+
+
+@asset(
+ uri="file://incoming/player-stats/team_c.csv",
+ schedule=CronPartitionTimetable("30 * * * *", timezone="UTC"),
+ tags=["player-stats", "ingestion"],
+)
+def team_c_player_stats():
+ """Produce hourly partitioned stats for Team C."""
+ pass
+
+
+with DAG(
+ dag_id="clean_and_combine_player_stats",
+ schedule=PartitionedAssetTimetable(
+ assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
+ default_partition_mapper=HourlyMapper(),
+ ),
+ catchup=False,
+ tags=["player-stats", "cleanup"],
+):
+ """
+ Combine hourly partitions from Team A, B and C into a single curated
dataset.
+
+ This Dag demonstrates multi-asset partition alignment using HourlyMapper.
+ """
+
+ @task(outlets=[combined_player_stats])
+ def combine_player_stats(dag_run=None):
+ """Merge the aligned hourly partitions into a combined dataset."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ combine_player_stats()
+
+
+@asset(
+ uri="file://analytics/player-stats/computed-player-odds.csv",
+ # Fallback to IdentityMapper if no partition_mapper is specified.
+ # If we want to other temporal mapper (e.g., HourlyMapper) here,
+ # make sure the input_format is changed since the partition_key is now in
"%Y-%m-%dT%H" format
+ # instead of a valid timestamp
+ schedule=PartitionedAssetTimetable(assets=combined_player_stats),
+ tags=["player-stats", "odds"],
+)
+def compute_player_odds():
+ """
+ Compute player odds from the combined hourly statistics.
+
+ This asset is partition-aware and triggered by the combined stats asset.
+ """
+ pass
+
+
+with DAG(
+ dag_id="player_odds_quality_check_wont_ever_to_trigger",
+ schedule=PartitionedAssetTimetable(
+ assets=(combined_player_stats & team_a_player_stats &
Asset.ref(name="team_b_player_stats")),
+ partition_mapper_config={
+ combined_player_stats: YearlyMapper(), # incompatible on purpose
+ team_a_player_stats: HourlyMapper(),
+ Asset.ref(name="team_b_player_stats"): HourlyMapper(),
+ },
+ ),
+ catchup=False,
+ tags=["player-stats", "odds"],
+):
+ """
+ Demonstrate a partition mapper mismatch scenario.
+
+ The configured partition mapper transforms partition keys into formats
+ that never matches ("%Y" v.s. "%Y-%m-%dT%H), so the Dag will never trigger.
+ """
+
+ @task
+ def check_partition_alignment():
+ pass
+
+ check_partition_alignment()