This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2e8773b08e547ff6ca585cc0d5a35fccdaf2b7c5
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Oct 13 15:00:13 2021 +0100

    Don't bake ENV and _cmd into tmp config for non-sudo (#18772)
    
    If we are running tasks via sudo then AIRFLOW__ config env vars won't be
    visible anymore (without them showing up in `ps`) and we likely might
    not have permission to run the _cmd's specified to find the passwords.
    
    But if we are running as the same user then there is no need to "bake"
    those options in to the temporary config file -- if the operator decided
    they didn't want those values appearing in a config file on disk, then
    lets do our best to respect that.
    
    Note: this commit originally appears in 2019 but a critical piece was
    missing, meaning that the secrets/envs were still actually appearing.
    
    (cherry picked from commit a90878cf660ffe73973b4e4487c1e691cc212925)
---
 airflow/task/task_runner/base_task_runner.py    |  4 +-
 airflow/utils/configuration.py                  | 14 ++++++-
 tests/conftest.py                               |  5 +++
 tests/task/task_runner/test_base_task_runner.py | 54 +++++++++++++++++++++++++
 4 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/airflow/task/task_runner/base_task_runner.py 
b/airflow/task/task_runner/base_task_runner.py
index f70020d..28bb847 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -66,7 +66,7 @@ class BaseTaskRunner(LoggingMixin):
             # want to have to specify them in the sudo call - they would show
             # up in `ps` that way! And run commands now, as the other user
             # might not be able to run the cmds to get credentials
-            cfg_path = tmp_configuration_copy(chmod=0o600)
+            cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, 
include_cmds=True)
 
             # Give ownership of file to user; only they can read and write
             subprocess.call(['sudo', 'chown', self.run_as_user, cfg_path], 
close_fds=True)
@@ -83,7 +83,7 @@ class BaseTaskRunner(LoggingMixin):
             # we are running as the same user, and can pass through environment
             # variables then we don't need to include those in the config copy
             # - the runner can read/execute those values as it needs
-            cfg_path = tmp_configuration_copy(chmod=0o600)
+            cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, 
include_cmds=False)
 
         self._error_file = NamedTemporaryFile(delete=True)
         if self.run_as_user:
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index 7057d64..cc9273c 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -23,13 +23,23 @@ from tempfile import mkstemp
 from airflow.configuration import conf
 
 
-def tmp_configuration_copy(chmod=0o600):
+def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
     """
     Returns a path for a temporary file including a full copy of the 
configuration
     settings.
+
+    :param include_env: Should the value of configuration from ``AIRFLOW__``
+        environment variables be included or not
+    :type include_env: bool
+    :param include_cmds: Should the result of calling any *_cmd config be
+        set (True, default), or should the _cmd options be left as the
+        command to run (False)
+    :type include_cmds: bool
     :return: a path to a temporary file
     """
-    cfg_dict = conf.as_dict(display_sensitive=True, raw=True)
+    cfg_dict = conf.as_dict(
+        display_sensitive=True, raw=True, include_cmds=include_cmds, 
include_env=include_env
+    )
     temp_fd, cfg_path = mkstemp()
 
     with os.fdopen(temp_fd, 'w') as temp_file:
diff --git a/tests/conftest.py b/tests/conftest.py
index 22770c5..94f915f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -476,6 +476,8 @@ def dag_maker(request):
     from airflow.utils.log.logging_mixin import LoggingMixin
 
     class DagFactory(LoggingMixin):
+        _own_session = False
+
         def __init__(self):
             from airflow.models import DagBag
 
@@ -577,6 +579,7 @@ def dag_maker(request):
             from airflow.utils import timezone
 
             if session is None:
+                self._own_session = True
                 session = settings.Session()
 
             self.kwargs = kwargs
@@ -629,6 +632,8 @@ def dag_maker(request):
                         synchronize_session=False
                     )
                     self.session.commit()
+                    if self._own_session:
+                        self.session.expunge_all()
 
     factory = DagFactory()
 
diff --git a/tests/task/task_runner/test_base_task_runner.py 
b/tests/task/task_runner/test_base_task_runner.py
new file mode 100644
index 0000000..499bba7
--- /dev/null
+++ b/tests/task/task_runner/test_base_task_runner.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import mock
+
+import pytest
+
+from airflow.jobs.local_task_job import LocalTaskJob
+from airflow.models.baseoperator import BaseOperator
+from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+
+
[email protected](["impersonation"], (("nobody",), (None,)))
[email protected]('subprocess.call')
[email protected]('os.chown')
[email protected]('airflow.task.task_runner.base_task_runner.tmp_configuration_copy')
+def test_config_copy_mode(tmp_configuration_copy, chown, subprocess_call, 
dag_maker, impersonation):
+    with dag_maker("test"):
+        BaseOperator(task_id="task_1", run_as_user=impersonation)
+
+    dr = dag_maker.create_dagrun()
+
+    ti = dr.task_instances[0]
+    job = LocalTaskJob(ti)
+    runner = BaseTaskRunner(job)
+    # So we don't try to delete it -- cos the file wont exist
+    del runner._cfg_path
+
+    includes = bool(impersonation)
+
+    tmp_configuration_copy.assert_called_with(chmod=0o600, 
include_env=includes, include_cmds=includes)
+
+    if impersonation:
+        chown.assert_called()
+        subprocess_call.assert_called_with(
+            ['sudo', 'chown', impersonation, 
tmp_configuration_copy.return_value], close_fds=True
+        )
+    else:
+        chown.assert_not_called()
+        subprocess_call.not_assert_called()

Reply via email to