[
https://issues.apache.org/jira/browse/AIRFLOW-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711972#comment-16711972
]
Gabriel Silk commented on AIRFLOW-3418:
---------------------------------------
I'm seeing this issue as well, and I would re-iterate the criticality of this
issue. It's currently breaking our production clusters.
> Task stuck in running state, unable to clear
> --------------------------------------------
>
> Key: AIRFLOW-3418
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3418
> Project: Apache Airflow
> Issue Type: Bug
> Components: worker
> Affects Versions: 1.10.1
> Reporter: James Meickle
> Priority: Critical
>
> One of our tasks (a custom operator that sleep-waits until NYSE market close)
> got stuck in a "running" state in the metadata db without making any
> progress. This is what it looked like in the logs:
> {code:java}
> [2018-11-29 00:01:14,064] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close [2018-11-29 00:01:14,063] {{cli.py:484}} INFO - Running
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00
> [running]> on host airflow-core-i-0a53cac37067d957d.dlg.fnd.dynoquant.com
> [2018-11-29 06:03:57,643] {{models.py:1355}} INFO - Dependencies not met for
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00
> [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running'
> state which is not a valid state for execution. The task must be cleared in
> order to be run.
> [2018-11-29 06:03:57,644] {{models.py:1355}} INFO - Dependencies not met for
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00
> [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is
> already running, it started on 2018-11-29 00:01:10.876344+00:00.
> [2018-11-29 06:03:57,646] {{logging_mixin.py:95}} INFO - [2018-11-29
> 06:03:57,646] {{jobs.py:2614}} INFO - Task is not able to be run
> {code}
> Seeing this state, we attempted to "clear" it in the web UI. This yielded a
> complex backtrace:
> {code:java}
> Traceback (most recent call last):
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
> line 1982, in wsgi_app
> response = self.full_dispatch_request()
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
> line 1614, in full_dispatch_request
> rv = self.handle_user_exception(e)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
> line 1517, in handle_user_exception
> reraise(exc_type, exc_value, tb)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py",
> line 33, in reraise
> raise value
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
> line 1612, in full_dispatch_request
> rv = self.dispatch_request()
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
> line 1598, in dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
> line 26, in wraps
> return f(self, *args, **kwargs)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
> line 55, in wrapper
> return f(*args, **kwargs)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
> line 837, in clear
> include_upstream=upstream)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 4011, in sub_dag
> dag = copy.deepcopy(self)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166,
> in deepcopy
> y = copier(memo)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 3996, in __deepcopy__
> setattr(result, k, copy.deepcopy(v, memo))
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166,
> in deepcopy
> y = copier(memo)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 2740, in __deepcopy__
> setattr(result, k, copy.deepcopy(v, memo))
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182,
> in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297,
> in _reconstruct
> state = deepcopy(state, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182,
> in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297,
> in _reconstruct
> state = deepcopy(state, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182,
> in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297,
> in _reconstruct
> state = deepcopy(state, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182,
> in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297,
> in _reconstruct
> state = deepcopy(state, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218,
> in _deepcopy_list
> y.append(deepcopy(a, memo))
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182,
> in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297,
> in _reconstruct
> state = deepcopy(state, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155,
> in deepcopy
> y = copier(x, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243,
> in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174,
> in deepcopy
> rv = reductor(4)
> TypeError: cannot serialize '_io.TextIOWrapper' object
> {code}
> After browsing through Airflow's code I had a suspicion that this was simply
> the "clear" code in the UI not handling some property on one of our
> operators. I instead used the Browse feature to edit the metadata state db
> directly. This did result in the status change; in the task being set to
> "up_for_retry", and the same logfile now having additional contents:
> {code:java}
> [2018-11-29 14:18:11,390] {{logging_mixin.py:95}} INFO - [2018-11-29
> 14:18:11,390] {{jobs.py:2695}} WARNING - State of this instance has been
> externally set to failed. Taking the poison pill.
> [2018-11-29 14:18:11,399] {{helpers.py:240}} INFO - Sending Signals.SIGTERM
> to GPID 5287
> [2018-11-29 14:18:11,399] {{models.py:1636}} ERROR - Received SIGTERM.
> Terminating subprocesses.
> [2018-11-29 14:18:11,418] {{models.py:1760}} ERROR - Task received SIGTERM
> signal
> Traceback (most recent call last):
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 1654, in _run_raw_task
> result = task_copy.execute(context=context)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
> line 78, in execute
> sleep(self.poke_interval)
> File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 1638, in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-11-29 14:18:11,420] {{models.py:1783}} INFO - Marking task as
> UP_FOR_RETRY
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close Traceback (most recent call last):
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File "/home/airflow/virtualenvs/airflow/bin/airflow",
> line 32, in <module>
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close args.func(args)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
> line 74, in wrapper
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close return f(*args, **kwargs)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
> line 490, in run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close _run(args, dag, ti)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
> line 406, in _run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close pool=args.pool,
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
> line 74, in wrapper
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close return func(*args, **kwargs)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 1654, in _run_raw_task
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close result = task_copy.execute(context=context)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
> line 78, in execute
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close sleep(self.poke_interval)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close File
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
> line 1638, in signal_handler
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close raise AirflowException("Task received SIGTERM signal")
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275:
> Subtask after_close airflow.exceptions.AirflowException: Task received
> SIGTERM signal
> [2018-11-29 14:18:11,693] {{helpers.py:230}} INFO - Process
> psutil.Process(pid=5287 (terminated)) (5287) terminated with exit code 1
> [2018-11-29 14:18:11,694] {{logging_mixin.py:95}} INFO - [2018-11-29
> 14:18:11,693] {{jobs.py:2627}} INFO - Task exited with return code 0
> {code}
> The log line about "not able to be run" comes from jobs.py and it's unclear
> to me why this would be called in this case (two workers grabbing the same
> message...?) or why the task would just hang in a "running" state:
> https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614
> We had not previously observed any of this behavior. We had just upgraded to
> 1.10.1 earlier this week.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)