ashb commented on a change in pull request #19965:
URL: https://github.com/apache/airflow/pull/19965#discussion_r771464168



##########
File path: airflow/decorators/base.py
##########
@@ -176,11 +178,110 @@ def _hook_apply_defaults(self, *args, **kwargs):
 
 T = TypeVar("T", bound=Callable)
 
+OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator")
+
+
[email protected]
+class OperatorWrapper(Generic[T, OperatorSubclass]):
+    """
+    Helper class for providing dynamic task mapping to decorated functions.
+
+    ``task_decorator_factory`` returns an instance of this, instead of just a 
plain wrapped function.
+
+    :meta private:
+    """
+
+    function: T = attr.ib(validator=attr.validators.is_callable())
+    operator_class: Type[OperatorSubclass]
+    multiple_outputs: bool = attr.ib()
+    kwargs: Dict[str, Any] = attr.ib(factory=dict)
+
+    decorator_name: str = attr.ib(repr=False, default="task")
+    function_arg_names: Set[str] = attr.ib(repr=False)
+
+    @function_arg_names.default
+    def _get_arg_names(self):
+        return set(inspect.signature(self.function).parameters)

Review comment:
       This would be the change that addresses most of your points. What do you 
think?
   
   ```diff
   diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
   index 3ee13aed4..6f1fa53b9 100644
   --- a/airflow/decorators/base.py
   +++ b/airflow/decorators/base.py
   @@ -28,6 +28,7 @@ from airflow.models.baseoperator import BaseOperator, 
MappedOperator
    from airflow.models.dag import DAG, DagContext
    from airflow.models.xcom_arg import XComArg
    from airflow.utils.task_group import TaskGroup, TaskGroupContext
   +from airflow.compat.functools import cached_property
    
    
    def validate_python_callable(python_callable):
   @@ -181,7 +182,7 @@ T = TypeVar("T", bound=Callable)
    OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator")
    
    
   [email protected]
   [email protected](slots=False)
    class OperatorWrapper(Generic[T, OperatorSubclass]):
        """
        Helper class for providing dynamic task mapping to decorated functions.
   @@ -199,22 +200,32 @@ class OperatorWrapper(Generic[T, OperatorSubclass]):
        decorator_name: str = attr.ib(repr=False, default="task")
        function_arg_names: Set[str] = attr.ib(repr=False)
    
   +    @cached_property
   +    def function_signature(self):
   +        return inspect.signature(self.function)
   +
        @function_arg_names.default
        def _get_arg_names(self):
   -        return set(inspect.signature(self.function).parameters)
   +        return set(self.function_signature.parameters)
   +
   +    del _get_arg_names
    
        @function.validator
        def _validate_function(self, _, f):
            if 'self' in self.function_arg_names:
                raise TypeError(f'@{self.decorator_name} does not support 
methods')
    
   +    del _validate_function
   +
        @multiple_outputs.default
        def _infer_multiple_outputs(self):
   -        sig = inspect.signature(self.function).return_annotation
   +        sig = self.function_signature.return_annotation
            ttype = getattr(sig, "__origin__", None)
    
            return sig is not inspect.Signature.empty and ttype in (dict, Dict)
    
   +    del _infer_multiple_outputs
   +
        def __attrs_post_init__(self):
            self.kwargs.setdefault('task_id', self.function.__name__)
    
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to