dabla commented on issue #41470:
URL: https://github.com/apache/airflow/issues/41470#issuecomment-2401453034

   > I've looked into this issue more deeply and I believe I can provide a 
solution. I could use some guidance on whether I should submit a PR for this.
   > 
   > The problem, in my opinion, is that after `_execute_callable` is called, 
it enters the `ExecutorSafeguard` wrapper. There, it checks if it's called 
inside a `TaskInstance` (hence the `_execute_callable`) and then returns the 
execution function itself. Because the `execute` method of 
BigQueryTableExistenceSensor is also decorated, it automatically re-enters the 
`ExecutorSafeguard` wrapper, but this time it appears as if it was called 
outside a Task Instance, causing the check to fail. With 
`allow_nested_operators`, it only fires a warning instead of failing.
   > 
   > My proposed solution is to check if the `ExecutorSafeguard` has already 
been called in the current execution context. If so, we can skip the check for 
nested calls. This can be achieved by setting a flag using thread-local storage 
to ensure thread safety.
   > 
   > ```python
   > class ExecutorSafeguard:
   >     """
   >     The ExecutorSafeguard decorator.
   > 
   >     Checks if the execute method of an operator isn't manually called 
outside
   >     the TaskInstance as we want to avoid bad mixing between decorated and
   >     classic operators.
   >     """
   > 
   >     test_mode = conf.getboolean("core", "unit_test_mode")
   >     _local = local()
   > 
   >     @classmethod
   >     def decorator(cls, func):
   >         @wraps(func)
   >         def wrapper(self, *args, **kwargs):
   >             if (
   >                 getattr(ExecutorSafeguard._local, "in_executor_safeguard", 
False)
   >                 and self.allow_nested_operators
   >             ):
   >                 # If already in ExecutorSafeguard, call execution function 
- recursive call
   >                 return func(self, *args, **kwargs)
   >             ExecutorSafeguard._local.in_executor_safeguard = True
   > 
   >             try:
   >                 from airflow.decorators.base import DecoratedOperator
   > 
   >                 sentinel = 
kwargs.pop(f"{self.__class__.__name__}__sentinel", None)
   > 
   >                 if (
   >                     not cls.test_mode
   >                     and sentinel != _sentinel
   >                     and not isinstance(self, DecoratedOperator)
   >                 ):
   >                     message = (
   >                         f"{self.__class__.__name__}.{func.__name__} cannot 
be called outside TaskInstance!"
   >                     )
   >                     raise AirflowException(message)
   >                 return func(self, *args, **kwargs)
   >             finally:
   >                 ExecutorSafeguard._local.in_executor_safeguard = False
   > 
   >         return wrapper
   > ```
   > 
   > This solution would suppress the warning for legitimate nested calls while 
still protecting against unintended external calls. The use of `local()` 
ensures that this check is thread-safe.
   > 
   > I'm willing to create a Pull Request with this solution if the maintainers 
think this approach is appropriate. What do you think about this solution? Are 
there any aspects I should consider or modify before proceeding with a PR?
   
   I like the approach, you can open a PR and add you example as a test case, 
that way we are sure this case is not only fixed but also tested against 
regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to