ashb commented on a change in pull request #19965:
URL: https://github.com/apache/airflow/pull/19965#discussion_r771464168
##########
File path: airflow/decorators/base.py
##########
@@ -176,11 +178,110 @@ def _hook_apply_defaults(self, *args, **kwargs):
T = TypeVar("T", bound=Callable)
+OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator")
+
+
[email protected]
+class OperatorWrapper(Generic[T, OperatorSubclass]):
+ """
+ Helper class for providing dynamic task mapping to decorated functions.
+
+ ``task_decorator_factory`` returns an instance of this, instead of just a
plain wrapped function.
+
+ :meta private:
+ """
+
+ function: T = attr.ib(validator=attr.validators.is_callable())
+ operator_class: Type[OperatorSubclass]
+ multiple_outputs: bool = attr.ib()
+ kwargs: Dict[str, Any] = attr.ib(factory=dict)
+
+ decorator_name: str = attr.ib(repr=False, default="task")
+ function_arg_names: Set[str] = attr.ib(repr=False)
+
+ @function_arg_names.default
+ def _get_arg_names(self):
+ return set(inspect.signature(self.function).parameters)
Review comment:
This would be the change that addresses most of your points. What do you
think?
```diff
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 3ee13aed4..6f1fa53b9 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -28,6 +28,7 @@ from airflow.models.baseoperator import BaseOperator,
MappedOperator
from airflow.models.dag import DAG, DagContext
from airflow.models.xcom_arg import XComArg
from airflow.utils.task_group import TaskGroup, TaskGroupContext
+from airflow.compat.functools import cached_property
def validate_python_callable(python_callable):
@@ -181,7 +182,7 @@ T = TypeVar("T", bound=Callable)
OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator")
[email protected]
[email protected](slots=False)
class OperatorWrapper(Generic[T, OperatorSubclass]):
"""
Helper class for providing dynamic task mapping to decorated functions.
@@ -199,22 +200,32 @@ class OperatorWrapper(Generic[T, OperatorSubclass]):
decorator_name: str = attr.ib(repr=False, default="task")
function_arg_names: Set[str] = attr.ib(repr=False)
+ @cached_property
+ def function_signature(self):
+ return inspect.signature(self.function)
+
@function_arg_names.default
def _get_arg_names(self):
- return set(inspect.signature(self.function).parameters)
+ return set(self.function_signature.parameters)
+
+ del _get_arg_names
@function.validator
def _validate_function(self, _, f):
if 'self' in self.function_arg_names:
raise TypeError(f'@{self.decorator_name} does not support
methods')
+ del _validate_function
+
@multiple_outputs.default
def _infer_multiple_outputs(self):
- sig = inspect.signature(self.function).return_annotation
+ sig = self.function_signature.return_annotation
ttype = getattr(sig, "__origin__", None)
return sig is not inspect.Signature.empty and ttype in (dict, Dict)
+ del _infer_multiple_outputs
+
def __attrs_post_init__(self):
self.kwargs.setdefault('task_id', self.function.__name__)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]