dabla commented on issue #41470: URL: https://github.com/apache/airflow/issues/41470#issuecomment-2401453034
> I've looked into this issue more deeply and I believe I can provide a solution. I could use some guidance on whether I should submit a PR for this. > > The problem, in my opinion, is that after `_execute_callable` is called, it enters the `ExecutorSafeguard` wrapper. There, it checks if it's called inside a `TaskInstance` (hence the `_execute_callable`) and then returns the execution function itself. Because the `execute` method of BigQueryTableExistenceSensor is also decorated, it automatically re-enters the `ExecutorSafeguard` wrapper, but this time it appears as if it was called outside a Task Instance, causing the check to fail. With `allow_nested_operators`, it only fires a warning instead of failing. > > My proposed solution is to check if the `ExecutorSafeguard` has already been called in the current execution context. If so, we can skip the check for nested calls. This can be achieved by setting a flag using thread-local storage to ensure thread safety. > > ```python > class ExecutorSafeguard: > """ > The ExecutorSafeguard decorator. > > Checks if the execute method of an operator isn't manually called outside > the TaskInstance as we want to avoid bad mixing between decorated and > classic operators. > """ > > test_mode = conf.getboolean("core", "unit_test_mode") > _local = local() > > @classmethod > def decorator(cls, func): > @wraps(func) > def wrapper(self, *args, **kwargs): > if ( > getattr(ExecutorSafeguard._local, "in_executor_safeguard", False) > and self.allow_nested_operators > ): > # If already in ExecutorSafeguard, call execution function - recursive call > return func(self, *args, **kwargs) > ExecutorSafeguard._local.in_executor_safeguard = True > > try: > from airflow.decorators.base import DecoratedOperator > > sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None) > > if ( > not cls.test_mode > and sentinel != _sentinel > and not isinstance(self, DecoratedOperator) > ): > message = ( > f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!" > ) > raise AirflowException(message) > return func(self, *args, **kwargs) > finally: > ExecutorSafeguard._local.in_executor_safeguard = False > > return wrapper > ``` > > This solution would suppress the warning for legitimate nested calls while still protecting against unintended external calls. The use of `local()` ensures that this check is thread-safe. > > I'm willing to create a Pull Request with this solution if the maintainers think this approach is appropriate. What do you think about this solution? Are there any aspects I should consider or modify before proceeding with a PR? I like the approach, you can open a PR and add you example as a test case, that way we are sure this case is not only fixed but also tested against regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org