Hi all, Sharing a simplified use case for [AIP-76: Asset Partitions](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-76+Asset+Partitions) from my team: (written in markdown style, gist here:https://gist.github.com/Alexhans/d6a2551507ad7af37b2ca550a95424e2)
# Use Case - Process partitioned upstream data `just in time`, processing it at a `per partition` level. Partitions don't all appear at the same time. - The generated data is partitioned and other dags can, in turn, depend on it (With a strategy of micropipelines that output to one or multiple datasets) **Note**: At the moment, we don't have a signal from our upstream tables so we need to resort to polling and use custom sensors (calling a completeness API) for that. The combination of [AIP-82](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-82+External+event+driven+scheduling+in+Airflow?focusedCommentId=315493589) and this one should remove the need from using sensors, even if we don't have signals from upstream yet. Currently we also have some fat pipelines but are slowly moving in the smaller pipeline direction, like in the example here. A typical dag could look like this: ```python with DAG( schedule="@daily", # [...] for region in covered_regions: group_id = f"region_{region}" with TaskGroup(group_id=group_id) as per_region: # [...] # Process partitions in parallel ( [upstream_table_aaa_sensor, upstream_table_bbb_sensor] >> data_transformations >> metadata_creation ) # [...] ``` **NOTE**: The rationale for having a single dag run with multiple regions in branches instead of multiple runs is: - I can pass the relevant region parameter for each branch and don't need an extra dag to trigger. - The grid UI is very clean. Horizontally, only one run appears per day. The different regions are collapsable. Very easy to reason about. The custom sensors could look like this (being parameterized with the upstream metadata to determine data completeness): ```python upstream_table_aaa_sensor = SomeCustomSensor( deferrable=True, payload={ "table": "aaa", "partitions": { "region": region_data_from_xcom_or_opargs, "day": day_data_from_xcom_or_opargs, } } ) upstream_table_bbb_sensor = SomeCustomSensor( deferrable=True, payload={ "table": "bbb", "partitions": { "region": region_data_from_xcom_or_opargs, "day": day_data_from_xcom_or_opargs, } } ) ``` If I'm interpreting things correctly, With `AIP-76`, it seems I would have to break up the idea of using a single run a day for all regions and instead the DataAsset would, with a specific version of partitions, run the appropriate dag_runs, e.g.: - Once conditions are met, run for `[region="some_region", day=some_day]` - Once conditions are met, run for `[region="another_region", day=some_day]` ```python aaa_asset = DataAsset( asset_id="aaa", schedule="@daily", # I omitted the "day" partition which would be present in a HIVE structure # /day=2024-01-01/region=some_region because it seems that would allow me to # use the right TimeInterval (start-end) and support moving daily partitioned jobs to # hourly with ease. With Iceberg it might make even more sense. partition=PartitionBySequence(["region"]), ) bbb_asset = DataAsset(asset_id="bbb", schedule="@daily", partition=PartitionBySequence(["region"])) ``` The dag would use both those assets as schedule ```python with DAG( # [...] schedule=aaa_asset & bbb_asset, # [...] It would be a parameterized partition based run. data_transformations >> metadata_creation ``` - Is my interpretation correct? - Are we thinking of a UI that would allow us to represent things cleanly in the grid to be able to differentiate/group the runs for different regions that occur within the same day (In the general case, grouping partitions in the UI by their TimeInterval segment)? Best, Alex