stroykova commented on issue #14672:
URL: https://github.com/apache/airflow/issues/14672#issuecomment-943508653
I use airflow in a data science project with lots of multiprocessing.
I also have the same issue with SIGTERM signal. Here is a code example that
shows using multiprocessing produces sigterm:
```
from pebble import ProcessPool
import multiprocessing
import signal
def function(foo, bar=0):
print(multiprocessing.current_process().pid)
def task_done(future):
future.result() # blocks until results are ready
def process():
def signal_handler(signum, frame):
raise ValueError("received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
with ProcessPool(max_workers=5, max_tasks=10) as pool:
for i in range(0, 10):
future = pool.schedule(function, args=[i], timeout=10)
future.add_done_callback(task_done)
```
I added sigterm handler and see it in every spawned process. I do not
completely understand why this happens.
But I also know that airflow listens to sigterm signal and stops the task on
it. So I am not able to use PythonOperator with multiprocessing. It is quite
annoying.
The workaround is to wrap python code with multiprocessing to BashOperator.
Hope this would help someone.
It would be great to be able to use PythonOperator for such things. Hope
this will help to investigate this issue. I think it should be discussed and
maybe reopened.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]