This is an automated email from the ASF dual-hosted git repository.

tomaz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/libcloud.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc224737e Make _init_once() tests more robust by monkey patching 
os.environ (no chance of a race when running tests in multiple worker 
processes) and by resetting debug functionality before each test run to ensure 
each function runs in a clean environment.
bc224737e is described below

commit bc224737e9ee07ab3e76f388d7b422ecd1f04b79
Author: Tomaz Muraus <to...@tomaz.me>
AuthorDate: Sun Apr 14 22:16:08 2024 +0200

    Make _init_once() tests more robust by monkey patching os.environ (no
    chance of a race when running tests in multiple worker processes) and by
    resetting debug functionality before each test run to ensure each
    function runs in a clean environment.
---
 libcloud/__init__.py       | 13 +++++++++++++
 libcloud/test/test_init.py | 32 ++++++++++++++++++++++++--------
 2 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/libcloud/__init__.py b/libcloud/__init__.py
index 39e5507bc..1f4e9c22e 100644
--- a/libcloud/__init__.py
+++ b/libcloud/__init__.py
@@ -65,6 +65,19 @@ def enable_debug(fo):
     atexit.register(close_file, fo)
 
 
+def reset_debug():
+    """
+    Reset debugging functionality (if set).
+
+    NOTE: This function is only meant to be used in the tests.
+    """
+    from libcloud.common.base import Connection, LibcloudConnection
+    from libcloud.utils.loggingconnection import LoggingConnection
+
+    LoggingConnection.log = None
+    Connection.conn_class = LibcloudConnection
+
+
 def _init_once():
     """
     Utility function that is ran once on Library import.
diff --git a/libcloud/test/test_init.py b/libcloud/test/test_init.py
index 418dcdf4c..2d63ca29d 100644
--- a/libcloud/test/test_init.py
+++ b/libcloud/test/test_init.py
@@ -17,12 +17,14 @@ import os
 import sys
 import logging
 import tempfile
+from unittest import mock
 from unittest.mock import patch
 
 import libcloud
-from libcloud import _init_once
+from libcloud import _init_once, reset_debug
 from libcloud.base import DriverTypeNotFoundError
 from libcloud.test import unittest
+from libcloud.common.base import LibcloudConnection, Connection
 from libcloud.utils.loggingconnection import LoggingConnection
 
 try:
@@ -32,32 +34,46 @@ try:
 except ImportError:
     have_paramiko = False
 
+_, TEMP_LOGFILE_PATH = tempfile.mkstemp()
+
 
 class TestUtils(unittest.TestCase):
-    def tearDown(self):
-        if "LIBCLOUD_DEBUG" in os.environ:
-            del os.environ["LIBCLOUD_DEBUG"]
+    def setUp(self):
+        reset_debug()
 
-    def test_init_once_and_debug_mode(self):
+    @mock.patch.dict(os.environ, {"LIBCLOUD_DEBUG": ""}, clear=True)
+    def test_init_once_and_no_debug_mode(self):
         if have_paramiko:
             paramiko_logger = logging.getLogger("paramiko")
             paramiko_logger.setLevel(logging.INFO)
 
+        self.assertIsNone(LoggingConnection.log)
+        self.assertEqual(Connection.conn_class, LibcloudConnection)
+
         # Debug mode is disabled
         _init_once()
 
         self.assertIsNone(LoggingConnection.log)
+        self.assertEqual(Connection.conn_class, LibcloudConnection)
 
         if have_paramiko:
             paramiko_log_level = paramiko_logger.getEffectiveLevel()
             self.assertEqual(paramiko_log_level, logging.INFO)
 
-        # Enable debug mode
-        _, tmp_path = tempfile.mkstemp()
-        os.environ["LIBCLOUD_DEBUG"] = tmp_path
+    @mock.patch.dict(os.environ, {"LIBCLOUD_DEBUG": TEMP_LOGFILE_PATH}, 
clear=True)
+    def test_init_once_and_debug_mode(self):
+        if have_paramiko:
+            paramiko_logger = logging.getLogger("paramiko")
+            paramiko_logger.setLevel(logging.INFO)
+
+        self.assertIsNone(LoggingConnection.log)
+        self.assertEqual(Connection.conn_class, LibcloudConnection)
+
+        # Debug mode is enabled
         _init_once()
 
         self.assertTrue(LoggingConnection.log is not None)
+        self.assertEqual(Connection.conn_class, LoggingConnection)
 
         if have_paramiko:
             paramiko_log_level = paramiko_logger.getEffectiveLevel()

Reply via email to