This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1956f38 Enhance magic methods on XComArg for UX (#21882)
1956f38 is described below
commit 1956f38276b982f7f0f72944d4fb8615ee55459f
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 3 16:59:18 2022 +0800
Enhance magic methods on XComArg for UX (#21882)
* Enhance magic methods on XComArg for UX
Implementing Python's magic methods for purposes not matching their
original design can have unwanted consequences due to default
interactions between those methods. This implements a custom __iter__ so
the custom __getitem__ does not trigger the default __iter__ behavior
and result in bad run-time behavior.
---
airflow/models/xcom_arg.py | 18 +++++++++++++++++-
tests/models/test_xcom_arg.py | 14 ++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 29f3f77..2234d1d 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -66,10 +66,26 @@ class XComArg(DependencyMixin):
def __eq__(self, other):
return self.operator == other.operator and self.key == other.key
- def __getitem__(self, item):
+ def __getitem__(self, item: str) -> "XComArg":
"""Implements xcomresult['some_result_key']"""
+ if not isinstance(item, str):
+ raise ValueError(f"XComArg only supports str lookup, received
{type(item).__name__}")
return XComArg(operator=self.operator, key=item)
+ def __iter__(self):
+ """Override iterable protocol to raise error explicitly.
+
+ The default ``__iter__`` implementation in Python calls ``__getitem__``
+ with 0, 1, 2, etc. until it hits an ``IndexError``. This does not work
+ well with our custom ``__getitem__`` implementation, and results in
poor
+ DAG-writing experience since a misplaced ``*`` expansion would create
an
+ infinite loop consuming the entire DAG parser.
+
+ This override catches the error eagerly, so an incorrectly implemented
+ DAG fails fast and avoids wasting resources on nonsensical iterating.
+ """
+ raise TypeError(f"{self.__class__.__name__!r} object is not iterable")
+
def __str__(self):
"""
Backward compatibility for old-style jinja used in Airflow Operators
diff --git a/tests/models/test_xcom_arg.py b/tests/models/test_xcom_arg.py
index aec2bef..cd3b548 100644
--- a/tests/models/test_xcom_arg.py
+++ b/tests/models/test_xcom_arg.py
@@ -116,6 +116,13 @@ class TestXComArgBuild:
assert op_a.output == XComArg(op_a)
+ def test_xcom_key_getitem_not_str(self, dag_maker):
+ python_op = build_python_op(dag_maker)
+ actual = XComArg(python_op)
+ with pytest.raises(ValueError) as ctx:
+ actual[1]
+ assert str(ctx.value) == "XComArg only supports str lookup, received
int"
+
def test_xcom_key_getitem(self, dag_maker):
python_op = build_python_op(dag_maker)
actual = XComArg(python_op, key="another_key")
@@ -123,6 +130,13 @@ class TestXComArgBuild:
actual_new_key = actual["another_key_2"]
assert actual_new_key.key == "another_key_2"
+ def test_xcom_not_iterable(self, dag_maker):
+ python_op = build_python_op(dag_maker)
+ actual = XComArg(python_op)
+ with pytest.raises(TypeError) as ctx:
+ list(actual)
+ assert str(ctx.value) == "'XComArg' object is not iterable"
+
@pytest.mark.system("core")
class TestXComArgRuntime: