This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2e8773b08e547ff6ca585cc0d5a35fccdaf2b7c5 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Wed Oct 13 15:00:13 2021 +0100 Don't bake ENV and _cmd into tmp config for non-sudo (#18772) If we are running tasks via sudo then AIRFLOW__ config env vars won't be visible anymore (without them showing up in `ps`) and we likely might not have permission to run the _cmd's specified to find the passwords. But if we are running as the same user then there is no need to "bake" those options in to the temporary config file -- if the operator decided they didn't want those values appearing in a config file on disk, then lets do our best to respect that. Note: this commit originally appears in 2019 but a critical piece was missing, meaning that the secrets/envs were still actually appearing. (cherry picked from commit a90878cf660ffe73973b4e4487c1e691cc212925) --- airflow/task/task_runner/base_task_runner.py | 4 +- airflow/utils/configuration.py | 14 ++++++- tests/conftest.py | 5 +++ tests/task/task_runner/test_base_task_runner.py | 54 +++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 4 deletions(-) diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index f70020d..28bb847 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -66,7 +66,7 @@ class BaseTaskRunner(LoggingMixin): # want to have to specify them in the sudo call - they would show # up in `ps` that way! And run commands now, as the other user # might not be able to run the cmds to get credentials - cfg_path = tmp_configuration_copy(chmod=0o600) + cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True) # Give ownership of file to user; only they can read and write subprocess.call(['sudo', 'chown', self.run_as_user, cfg_path], close_fds=True) @@ -83,7 +83,7 @@ class BaseTaskRunner(LoggingMixin): # we are running as the same user, and can pass through environment # variables then we don't need to include those in the config copy # - the runner can read/execute those values as it needs - cfg_path = tmp_configuration_copy(chmod=0o600) + cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, include_cmds=False) self._error_file = NamedTemporaryFile(delete=True) if self.run_as_user: diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 7057d64..cc9273c 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -23,13 +23,23 @@ from tempfile import mkstemp from airflow.configuration import conf -def tmp_configuration_copy(chmod=0o600): +def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): """ Returns a path for a temporary file including a full copy of the configuration settings. + + :param include_env: Should the value of configuration from ``AIRFLOW__`` + environment variables be included or not + :type include_env: bool + :param include_cmds: Should the result of calling any *_cmd config be + set (True, default), or should the _cmd options be left as the + command to run (False) + :type include_cmds: bool :return: a path to a temporary file """ - cfg_dict = conf.as_dict(display_sensitive=True, raw=True) + cfg_dict = conf.as_dict( + display_sensitive=True, raw=True, include_cmds=include_cmds, include_env=include_env + ) temp_fd, cfg_path = mkstemp() with os.fdopen(temp_fd, 'w') as temp_file: diff --git a/tests/conftest.py b/tests/conftest.py index 22770c5..94f915f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -476,6 +476,8 @@ def dag_maker(request): from airflow.utils.log.logging_mixin import LoggingMixin class DagFactory(LoggingMixin): + _own_session = False + def __init__(self): from airflow.models import DagBag @@ -577,6 +579,7 @@ def dag_maker(request): from airflow.utils import timezone if session is None: + self._own_session = True session = settings.Session() self.kwargs = kwargs @@ -629,6 +632,8 @@ def dag_maker(request): synchronize_session=False ) self.session.commit() + if self._own_session: + self.session.expunge_all() factory = DagFactory() diff --git a/tests/task/task_runner/test_base_task_runner.py b/tests/task/task_runner/test_base_task_runner.py new file mode 100644 index 0000000..499bba7 --- /dev/null +++ b/tests/task/task_runner/test_base_task_runner.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from unittest import mock + +import pytest + +from airflow.jobs.local_task_job import LocalTaskJob +from airflow.models.baseoperator import BaseOperator +from airflow.task.task_runner.base_task_runner import BaseTaskRunner + + [email protected](["impersonation"], (("nobody",), (None,))) [email protected]('subprocess.call') [email protected]('os.chown') [email protected]('airflow.task.task_runner.base_task_runner.tmp_configuration_copy') +def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, dag_maker, impersonation): + with dag_maker("test"): + BaseOperator(task_id="task_1", run_as_user=impersonation) + + dr = dag_maker.create_dagrun() + + ti = dr.task_instances[0] + job = LocalTaskJob(ti) + runner = BaseTaskRunner(job) + # So we don't try to delete it -- cos the file wont exist + del runner._cfg_path + + includes = bool(impersonation) + + tmp_configuration_copy.assert_called_with(chmod=0o600, include_env=includes, include_cmds=includes) + + if impersonation: + chown.assert_called() + subprocess_call.assert_called_with( + ['sudo', 'chown', impersonation, tmp_configuration_copy.return_value], close_fds=True + ) + else: + chown.assert_not_called() + subprocess_call.not_assert_called()
