This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dd358add2e AIP-76: Add ProductMapper for multi-dimensional partition 
keys (#61937)
8dd358add2e is described below

commit 8dd358add2e595f8610d44dcfc91a1143b8ad9c4
Author: Anish Giri <[email protected]>
AuthorDate: Thu Mar 5 04:42:43 2026 -0600

    AIP-76: Add ProductMapper for multi-dimensional partition keys (#61937)
---
 .../example_dags/example_asset_partition.py        |  47 +++++++++
 .../src/airflow/partition_mappers/product.py       |  67 ++++++++++++
 airflow-core/src/airflow/serialization/encoders.py |   9 ++
 .../tests/unit/partition_mappers/test_product.py   | 113 +++++++++++++++++++++
 .../unit/serialization/test_serialized_objects.py  |  43 ++++++++
 task-sdk/docs/api.rst                              |   2 +
 task-sdk/src/airflow/sdk/__init__.py               |   3 +
 task-sdk/src/airflow/sdk/__init__.pyi              |   2 +
 .../sdk/definitions/partition_mappers/product.py   |  34 +++++++
 9 files changed, 320 insertions(+)

diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index e744a2dc1ce..69c9d883e63 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -23,8 +23,11 @@ from airflow.sdk import (
     DAG,
     Asset,
     CronPartitionTimetable,
+    DailyMapper,
     HourlyMapper,
+    IdentityMapper,
     PartitionedAssetTimetable,
+    ProductMapper,
     YearlyMapper,
     asset,
     task,
@@ -137,3 +140,47 @@ with DAG(
         pass
 
     check_partition_alignment()
+
+
+regional_sales = Asset(uri="file://incoming/sales/regional.csv", 
name="regional_sales")
+
+with DAG(
+    dag_id="ingest_regional_sales",
+    schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+    tags=["sales", "ingestion"],
+):
+    """Produce hourly regional sales data with composite partition keys."""
+
+    @task(outlets=[regional_sales])
+    def ingest_sales():
+        """Ingest regional sales data partitioned by region and time."""
+        pass
+
+    ingest_sales()
+
+
+with DAG(
+    dag_id="aggregate_regional_sales",
+    schedule=PartitionedAssetTimetable(
+        assets=regional_sales,
+        default_partition_mapper=ProductMapper(IdentityMapper(), 
DailyMapper()),
+    ),
+    catchup=False,
+    tags=["sales", "aggregation"],
+):
+    """
+    Aggregate regional sales using ProductMapper.
+
+    The ProductMapper splits the composite key "region|timestamp" and applies
+    IdentityMapper to the region segment and DailyMapper to the timestamp 
segment,
+    aligning hourly partitions to daily granularity per region.
+    """
+
+    @task
+    def aggregate_sales(dag_run=None):
+        """Aggregate sales data for the matched region-day partition."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(dag_run.partition_key)
+
+    aggregate_sales()
diff --git a/airflow-core/src/airflow/partition_mappers/product.py 
b/airflow-core/src/airflow/partition_mappers/product.py
new file mode 100644
index 00000000000..5d8882bbbcc
--- /dev/null
+++ b/airflow-core/src/airflow/partition_mappers/product.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.partition_mappers.base import PartitionMapper
+
+
+class ProductMapper(PartitionMapper):
+    """Partition mapper that combines multiple mappers into a 
multi-dimensional key."""
+
+    def __init__(
+        self,
+        mapper0: PartitionMapper,
+        mapper1: PartitionMapper,
+        /,
+        *mappers: PartitionMapper,
+        delimiter: str = "|",
+    ) -> None:
+        self.mappers = [mapper0, mapper1, *mappers]
+        self.delimiter = delimiter
+
+    def to_downstream(self, key: str) -> str:
+        segments = key.split(self.delimiter)
+        if len(segments) != len(self.mappers):
+            raise ValueError(f"Expected {len(self.mappers)} segments in key, 
got {len(segments)}")
+        results: list[str] = []
+        for mapper, segment in zip(self.mappers, segments):
+            result = mapper.to_downstream(segment)
+            if not isinstance(result, str):
+                raise TypeError(
+                    f"ProductMapper child mappers must return a single key, "
+                    f"but {type(mapper).__name__} returned multiple keys"
+                )
+            results.append(result)
+        return self.delimiter.join(results)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.encoders import encode_partition_mapper
+
+        return {
+            "delimiter": self.delimiter,
+            "mappers": [encode_partition_mapper(m) for m in self.mappers],
+        }
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+        from airflow.serialization.decoders import decode_partition_mapper
+
+        mappers = [decode_partition_mapper(m) for m in data["mappers"]]
+        return cls(*mappers, delimiter=data.get("delimiter", "|"))
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index f82af80c70c..0242b6ee61d 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -43,6 +43,7 @@ from airflow.sdk import (
     MonthlyMapper,
     MultipleCronTriggerTimetable,
     PartitionMapper,
+    ProductMapper,
     QuarterlyMapper,
     WeeklyMapper,
     YearlyMapper,
@@ -373,6 +374,7 @@ class _Serializer:
         MonthlyMapper: "airflow.partition_mappers.temporal.MonthlyMapper",
         QuarterlyMapper: "airflow.partition_mappers.temporal.QuarterlyMapper",
         YearlyMapper: "airflow.partition_mappers.temporal.YearlyMapper",
+        ProductMapper: "airflow.partition_mappers.product.ProductMapper",
     }
 
     @functools.singledispatchmethod
@@ -407,6 +409,13 @@ class _Serializer:
             "output_format": partition_mapper.output_format,
         }
 
+    @serialize_partition_mapper.register
+    def _(self, partition_mapper: ProductMapper) -> dict[str, Any]:
+        return {
+            "delimiter": partition_mapper.delimiter,
+            "mappers": [encode_partition_mapper(m) for m in 
partition_mapper.mappers],
+        }
+
 
 _serializer = _Serializer()
 
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py 
b/airflow-core/tests/unit/partition_mappers/test_product.py
new file mode 100644
index 00000000000..a1c46abc629
--- /dev/null
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.partition_mappers.identity import IdentityMapper
+from airflow.partition_mappers.product import ProductMapper
+from airflow.partition_mappers.temporal import DailyMapper, HourlyMapper
+
+
+class TestProductMapper:
+    def test_to_downstream(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        assert pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
+
+    def test_to_downstream_wrong_segment_count(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        with pytest.raises(ValueError, match="Expected 2 segments"):
+            pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|extra")
+
+    def test_to_downstream_single_segment_for_two_mappers(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        with pytest.raises(ValueError, match="Expected 2 segments"):
+            pm.to_downstream("2024-01-15T10:30:00")
+
+    def test_custom_delimiter(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        assert pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") == 
"2024-01-15T10::2024-01-15"
+
+    def test_custom_delimiter_wrong_segment_count(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        with pytest.raises(ValueError, match="Expected 2 segments"):
+            pm.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00::extra")
+
+    def test_serialize(self):
+        from airflow.serialization.encoders import encode_partition_mapper
+
+        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        result = pm.serialize()
+        assert result == {
+            "delimiter": "|",
+            "mappers": [
+                encode_partition_mapper(HourlyMapper()),
+                encode_partition_mapper(DailyMapper()),
+            ],
+        }
+
+    def test_serialize_custom_delimiter(self):
+        from airflow.serialization.encoders import encode_partition_mapper
+
+        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        result = pm.serialize()
+        assert result == {
+            "delimiter": "::",
+            "mappers": [
+                encode_partition_mapper(HourlyMapper()),
+                encode_partition_mapper(DailyMapper()),
+            ],
+        }
+
+    def test_deserialize(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper())
+        serialized = pm.serialize()
+        restored = ProductMapper.deserialize(serialized)
+        assert isinstance(restored, ProductMapper)
+        assert len(restored.mappers) == 2
+        assert restored.delimiter == "|"
+        assert 
restored.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00") == 
"2024-01-15T10|2024-01-15"
+
+    def test_deserialize_custom_delimiter(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper(), delimiter="::")
+        serialized = pm.serialize()
+        restored = ProductMapper.deserialize(serialized)
+        assert isinstance(restored, ProductMapper)
+        assert restored.delimiter == "::"
+        assert (
+            restored.to_downstream("2024-01-15T10:30:00::2024-01-15T10:30:00") 
== "2024-01-15T10::2024-01-15"
+        )
+
+    def test_deserialize_backward_compat(self):
+        """Deserializing data without delimiter field defaults to '|'."""
+        from airflow.serialization.encoders import encode_partition_mapper
+
+        data = {
+            "mappers": [
+                encode_partition_mapper(HourlyMapper()),
+                encode_partition_mapper(DailyMapper()),
+            ],
+        }
+        restored = ProductMapper.deserialize(data)
+        assert restored.delimiter == "|"
+
+    def test_three_mappers(self):
+        pm = ProductMapper(HourlyMapper(), DailyMapper(), IdentityMapper())
+        assert (
+            pm.to_downstream("2024-01-15T10:30:00|2024-01-15T10:30:00|raw") == 
"2024-01-15T10|2024-01-15|raw"
+        )
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index d4f549f6df1..e001def29a9 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -834,6 +834,49 @@ def test_decode_partition_mapper_not_exists():
         decode_partition_mapper({Encoding.TYPE: "not_exists", Encoding.VAR: 
{}})
 
 
+def test_encode_product_mapper():
+    from airflow.sdk import HourlyMapper, IdentityMapper, ProductMapper
+    from airflow.serialization.encoders import encode_partition_mapper
+
+    partition_mapper = ProductMapper(IdentityMapper(), HourlyMapper())
+    assert encode_partition_mapper(partition_mapper) == {
+        Encoding.TYPE: "airflow.partition_mappers.product.ProductMapper",
+        Encoding.VAR: {
+            "delimiter": "|",
+            "mappers": [
+                {
+                    Encoding.TYPE: 
"airflow.partition_mappers.identity.IdentityMapper",
+                    Encoding.VAR: {},
+                },
+                {
+                    Encoding.TYPE: 
"airflow.partition_mappers.temporal.HourlyMapper",
+                    Encoding.VAR: {
+                        "input_format": "%Y-%m-%dT%H:%M:%S",
+                        "output_format": "%Y-%m-%dT%H",
+                    },
+                },
+            ],
+        },
+    }
+
+
+def test_decode_product_mapper():
+    from airflow.partition_mappers.product import ProductMapper as 
CoreProductMapper
+    from airflow.sdk import DailyMapper, HourlyMapper, ProductMapper
+    from airflow.serialization.decoders import decode_partition_mapper
+    from airflow.serialization.encoders import encode_partition_mapper
+
+    partition_mapper = ProductMapper(HourlyMapper(), DailyMapper())
+    encoded_pm = encode_partition_mapper(partition_mapper)
+
+    core_pm = decode_partition_mapper(encoded_pm)
+
+    assert isinstance(core_pm, CoreProductMapper)
+    assert len(core_pm.mappers) == 2
+    assert core_pm.delimiter == "|"
+    assert core_pm.to_downstream("2024-06-15T10:30:00|2024-06-15T10:30:00") == 
"2024-06-15T10|2024-06-15"
+
+
 class TestSerializedBaseOperator:
     # ensure the default logging config is used for this test, no matter what 
ran before
     @pytest.mark.usefixtures("reset_logging_config")
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 6b400e50448..88cc4205374 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -215,6 +215,8 @@ Partition Mapper
 
 .. autoapiclass:: airflow.sdk.YearlyMapper
 
+.. autoapiclass:: airflow.sdk.ProductMapper
+
 I/O Helpers
 -----------
 .. autoapiclass:: airflow.sdk.ObjectStoragePath
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index 1259870bdbf..669c87e019a 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -61,6 +61,7 @@ __all__ = [
     "PartitionedAssetTimetable",
     "PartitionMapper",
     "PokeReturnValue",
+    "ProductMapper",
     "QuarterlyMapper",
     "SkipMixin",
     "SyncCallback",
@@ -122,6 +123,7 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions.param import Param, ParamsDict
     from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
     from airflow.sdk.definitions.partition_mappers.identity import 
IdentityMapper
+    from airflow.sdk.definitions.partition_mappers.product import ProductMapper
     from airflow.sdk.definitions.partition_mappers.temporal import (
         DailyMapper,
         HourlyMapper,
@@ -198,6 +200,7 @@ __lazy_imports: dict[str, str] = {
     "PartitionedAssetTimetable": ".definitions.timetables.assets",
     "PartitionMapper": ".definitions.partition_mappers.base",
     "PokeReturnValue": ".bases.sensor",
+    "ProductMapper": ".definitions.partition_mappers.product",
     "QuarterlyMapper": ".definitions.partition_mappers.temporal",
     "SecretCache": ".execution_time.cache",
     "SkipMixin": ".bases.skipmixin",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index 60b87aeec9a..ed9943700b5 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -63,6 +63,7 @@ from airflow.sdk.definitions.edges import EdgeModifier as 
EdgeModifier, Label as
 from airflow.sdk.definitions.param import Param as Param
 from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
 from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
+from airflow.sdk.definitions.partition_mappers.product import ProductMapper
 from airflow.sdk.definitions.partition_mappers.temporal import (
     DailyMapper,
     HourlyMapper,
@@ -135,6 +136,7 @@ __all__ = [
     "PokeReturnValue",
     "PartitionedAssetTimetable",
     "PartitionMapper",
+    "ProductMapper",
     "QuarterlyMapper",
     "SecretCache",
     "SkipMixin",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py 
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py
new file mode 100644
index 00000000000..ffc744ff71e
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/product.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
+
+
+class ProductMapper(PartitionMapper):
+    """Partition mapper that combines multiple mappers into a 
multi-dimensional key."""
+
+    def __init__(
+        self,
+        mapper0: PartitionMapper,
+        mapper1: PartitionMapper,
+        /,
+        *mappers: PartitionMapper,
+        delimiter: str = "|",
+    ) -> None:
+        self.mappers = [mapper0, mapper1, *mappers]
+        self.delimiter = delimiter

Reply via email to