Hi all,

Sharing a simplified use case for [AIP-76: Asset 
Partitions](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-76+Asset+Partitions)
 from my team:
(written in markdown style, gist 
here:https://gist.github.com/Alexhans/d6a2551507ad7af37b2ca550a95424e2)

# Use Case
- Process partitioned upstream data `just in time`, processing it at a `per 
partition` level.  Partitions don't all appear at the same time.
- The generated data is partitioned and other dags can, in turn, depend on it 
(With a strategy of micropipelines that output to one or multiple datasets)

**Note**: At the moment, we don't have a signal from our upstream tables so we 
need to resort to polling and use custom sensors (calling a completeness API) 
for that.  The combination of 
[AIP-82](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-82+External+event+driven+scheduling+in+Airflow?focusedCommentId=315493589)
 and this one should remove the need from using sensors, even if we don't have 
signals from upstream yet.  Currently we also have some fat pipelines but are 
slowly moving in the smaller pipeline direction, like in the example here.

A typical dag could look like this:

```python
with DAG(
    schedule="@daily",
    # [...]
    for region in covered_regions:
        group_id = f"region_{region}"
        with TaskGroup(group_id=group_id) as per_region:
            # [...]
            # Process partitions in parallel
            (
                [upstream_table_aaa_sensor, upstream_table_bbb_sensor]
                >> data_transformations
                >> metadata_creation
            )
    # [...]
```

**NOTE**: The rationale for having a single dag run with multiple regions in 
branches instead of multiple runs is:
- I can pass the relevant region parameter for each branch and don't need an 
extra dag to trigger.
- The grid UI is very clean.  Horizontally, only one run appears per day.  The 
different regions are collapsable. Very easy to reason about.

The custom sensors could look like this (being parameterized with the upstream 
metadata to determine data completeness):

```python
upstream_table_aaa_sensor = SomeCustomSensor(
    deferrable=True,
    payload={
        "table": "aaa",
        "partitions": {
            "region": region_data_from_xcom_or_opargs,
            "day": day_data_from_xcom_or_opargs,
        }
    }
)

upstream_table_bbb_sensor = SomeCustomSensor(
    deferrable=True,
    payload={
        "table": "bbb",
        "partitions": {
            "region": region_data_from_xcom_or_opargs,
            "day": day_data_from_xcom_or_opargs,
        }
    }
)
```

If I'm interpreting things correctly, With `AIP-76`, it seems I would have to 
break up the idea of using a single run a day for all regions and instead the 
DataAsset would, with a specific version of partitions, run the appropriate 
dag_runs, e.g.:

- Once conditions are met, run for `[region="some_region", day=some_day]`
- Once conditions are met, run for `[region="another_region", day=some_day]`

```python
aaa_asset = DataAsset(
    asset_id="aaa",
    schedule="@daily",
    # I omitted the "day" partition which would be present in a HIVE structure
    # /day=2024-01-01/region=some_region because it seems that would allow me to
    # use the right TimeInterval (start-end) and support moving daily 
partitioned jobs to
    # hourly with ease.  With Iceberg it might make even more sense.
    partition=PartitionBySequence(["region"]),
)
bbb_asset = DataAsset(asset_id="bbb", schedule="@daily", 
partition=PartitionBySequence(["region"]))
```
The dag would use both those assets as schedule

```python
with DAG(
    # [...]
    schedule=aaa_asset & bbb_asset,
    # [...] It would be a parameterized partition based run.
    data_transformations >> metadata_creation
```

- Is my interpretation correct?
- Are we thinking of a UI that would allow us to represent things cleanly in 
the grid to be able to differentiate/group the runs for different regions that 
occur within the same day (In the general case, grouping partitions in the UI 
by their TimeInterval segment)?

Best,
Alex

Reply via email to