ashb commented on a change in pull request #19965:
URL: https://github.com/apache/airflow/pull/19965#discussion_r771516463



##########
File path: airflow/models/baseoperator.py
##########
@@ -1659,6 +1629,115 @@ def defer(
         """
         raise TaskDeferred(trigger=trigger, method_name=method_name, 
kwargs=kwargs, timeout=timeout)
 
+    def map(self, **kwargs) -> "MappedOperator":
+        return MappedOperator(
+            operator_class=type(self),
+            operator=self,
+            task_id=self.task_id,
+            task_group=getattr(self, 'task_group', None),
+            dag=getattr(self, '_dag', None),
+            start_date=self.start_date,
+            end_date=self.end_date,
+            partial_kwargs=self.__init_kwargs,
+            mapped_kwargs=kwargs,
+        )
+
+
+def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, 
value: Dict[str, Any]):
+    if isinstance(str, cls):
+        # Serialized version -- would have been validated at parse time
+        return
+
+    # use a dict so order of args is same as code order
+    unknown_args = value.copy()
+    for clazz in cls.mro():
+        # Mypy doesn't like doing `clas.__init__`, Error is: Cannot access 
"__init__" directly
+        init = clazz.__init__  # type: ignore
+
+        if not hasattr(init, '_BaseOperatorMeta__param_names'):
+            continue
+
+        for name in init._BaseOperatorMeta__param_names:
+            unknown_args.pop(name, None)
+
+        if not unknown_args:
+            # If we have no args left ot check: stop looking at the MRO chian
+            return
+
+    if len(unknown_args) == 1:
+        raise TypeError(
+            f'{cls.__name__}.{func_name} got unexpected keyword argument 
{unknown_args.popitem()[0]!r}'
+        )
+    else:
+        names = ", ".join(repr(n) for n in unknown_args)
+        raise TypeError(f'{cls.__name__}.{func_name} got unexpected keyword 
arguments {names}')
+
+
[email protected](kw_only=True)
+class MappedOperator(DAGNode):
+    """Object representing a mapped operator in a DAG"""
+
+    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)

Review comment:
       See 39b42fa49




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to