This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch engine-manager
in repository https://gitbox.apache.org/repos/asf/superset.git

commit bb5a15dc5a60c7894a3b38d6bcdea5018ca9864f
Author: Beto Dealmeida <[email protected]>
AuthorDate: Wed Dec 3 14:57:02 2025 -0500

    Cleanup
---
 superset/config.py                    |  5 +++--
 superset/engines/manager.py           | 31 +++++++++++++++++++------------
 superset/extensions/engine_manager.py | 25 +++++++------------------
 superset/models/core.py               | 12 ++++--------
 4 files changed, 33 insertions(+), 40 deletions(-)

diff --git a/superset/config.py b/superset/config.py
index 3b0e3146197..ee4dc92feca 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -52,6 +52,7 @@ from superset.advanced_data_type.plugins.internet_address 
import internet_addres
 from superset.advanced_data_type.plugins.internet_port import internet_port
 from superset.advanced_data_type.types import AdvancedDataType
 from superset.constants import CHANGE_ME_SECRET_KEY
+from superset.engines.manager import EngineModes
 from superset.jinja_context import BaseTemplateProcessor
 from superset.key_value.types import JsonKeyValueCodec
 from superset.stats_logger import DummyStatsLogger
@@ -266,10 +267,10 @@ SQLALCHEMY_CUSTOM_PASSWORD_STORE = None
 
 # Engine manager mode: "NEW" creates a new engine for every connection 
(default),
 # "SINGLETON" reuses engines with connection pooling
-ENGINE_MANAGER_MODE = "NEW"
+ENGINE_MANAGER_MODE = EngineModes.NEW
 
 # Cleanup interval for abandoned locks in seconds (default: 5 minutes)
-ENGINE_MANAGER_CLEANUP_INTERVAL = 300.0
+ENGINE_MANAGER_CLEANUP_INTERVAL = timedelta(minutes=5)
 
 # Automatically start cleanup thread for SINGLETON mode (default: True)
 ENGINE_MANAGER_AUTO_START_CLEANUP = True
diff --git a/superset/engines/manager.py b/superset/engines/manager.py
index 37be4641cb7..cbe855fd1fd 100644
--- a/superset/engines/manager.py
+++ b/superset/engines/manager.py
@@ -20,6 +20,7 @@ import logging
 import threading
 from collections import defaultdict
 from contextlib import contextmanager
+from datetime import timedelta
 from io import StringIO
 from typing import Any, TYPE_CHECKING
 
@@ -71,7 +72,7 @@ class EngineManager:
     def __init__(
         self,
         mode: EngineModes = EngineModes.NEW,
-        cleanup_interval: float = 300.0,  # 5 minutes default
+        cleanup_interval: timedelta = timedelta(minutes=5),
     ) -> None:
         self.mode = mode
         self.cleanup_interval = cleanup_interval
@@ -100,7 +101,7 @@ class EngineManager:
         except Exception as ex:
             # Avoid exceptions during garbage collection, but log if possible
             try:
-                logger.warning(f"Error stopping cleanup thread: {ex}")
+                logger.warning("Error stopping cleanup thread: %s", ex)
             except Exception:  # noqa: S110
                 # If logging fails during destruction, we can't do anything
                 pass
@@ -212,6 +213,10 @@ class EngineManager:
         needed, since it needs to connect to the tunnel instead of the 
original DB. But
         that information is only available after the tunnel is created.
         """
+        # Import here to avoid circular imports
+        from superset.extensions import security_manager
+        from superset.utils.feature_flag_manager import FeatureFlagManager
+
         uri = make_url_safe(database.sqlalchemy_uri_decrypted)
 
         extra = database.get_extra(source)
@@ -242,10 +247,6 @@ class EngineManager:
         # get effective username
         username = database.get_effective_user(uri)
 
-        # Import here to avoid circular imports
-        from superset.extensions import security_manager
-        from superset.utils.feature_flag_manager import FeatureFlagManager
-
         feature_flag_manager = FeatureFlagManager()
         if username and feature_flag_manager.is_feature_enabled(
             "IMPERSONATE_WITH_EMAIL_PREFIX"
@@ -322,7 +323,6 @@ class EngineManager:
             user_id,
         )
 
-        tunnel = None
         if database.ssh_tunnel:
             tunnel = self._get_tunnel(database.ssh_tunnel, uri)
             uri = uri.set(
@@ -444,7 +444,8 @@ class EngineManager:
                 )
                 self._cleanup_thread.start()
                 logger.info(
-                    f"Started cleanup thread with {self.cleanup_interval}s 
interval"
+                    "Started cleanup thread with %ds interval",
+                    self.cleanup_interval.total_seconds(),
                 )
 
     def stop_cleanup_thread(self) -> None:
@@ -475,7 +476,9 @@ class EngineManager:
                 logger.exception("Error during background cleanup")
 
             # Use wait() instead of sleep() to allow for immediate shutdown
-            if self._cleanup_stop_event.wait(timeout=self.cleanup_interval):
+            if self._cleanup_stop_event.wait(
+                timeout=self.cleanup_interval.total_seconds()
+            ):
                 break  # Stop event was set
 
     def cleanup(self) -> None:
@@ -502,7 +505,8 @@ class EngineManager:
 
         if abandoned_engine_locks:
             logger.debug(
-                f"Cleaned up {len(abandoned_engine_locks)} abandoned engine 
locks"
+                "Cleaned up %d abandoned engine locks",
+                len(abandoned_engine_locks),
             )
 
         # Clean up tunnel locks
@@ -513,7 +517,8 @@ class EngineManager:
 
         if abandoned_tunnel_locks:
             logger.debug(
-                f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel 
locks"
+                "Cleaned up %d abandoned tunnel locks",
+                len(abandoned_tunnel_locks),
             )
 
     def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> 
None:
@@ -522,7 +527,9 @@ class EngineManager:
             try:
                 # `pop` is atomic -- no lock needed
                 if self._engines.pop(engine_key, None):
-                    logger.info(f"Engine disposed and removed from cache: 
{engine_key}")
+                    logger.info(
+                        "Engine disposed and removed from cache: %s", 
engine_key
+                    )
                     self._engine_locks.pop(engine_key, None)
             except Exception as ex:
                 logger.error(
diff --git a/superset/extensions/engine_manager.py 
b/superset/extensions/engine_manager.py
index 5c07d1cdd9e..c341364e2a5 100644
--- a/superset/extensions/engine_manager.py
+++ b/superset/extensions/engine_manager.py
@@ -45,24 +45,12 @@ class EngineManagerExtension:
         Initialize the EngineManager with Flask app configuration.
         """
         # Get configuration values with defaults
-        mode_name = app.config.get("ENGINE_MANAGER_MODE", "NEW")
-        cleanup_interval = app.config.get("ENGINE_MANAGER_CLEANUP_INTERVAL", 
300.0)
-        auto_start_cleanup = 
app.config.get("ENGINE_MANAGER_AUTO_START_CLEANUP", True)
-
-        # Convert mode string to enum
-        try:
-            mode = EngineModes[mode_name.upper()]
-        except KeyError:
-            logger.warning(
-                f"Invalid ENGINE_MANAGER_MODE '{mode_name}', defaulting to NEW"
-            )
-            mode = EngineModes.NEW
+        mode = app.config["ENGINE_MANAGER_MODE"]
+        cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"]
+        auto_start_cleanup = app.config["ENGINE_MANAGER_AUTO_START_CLEANUP"]
 
         # Create the engine manager
-        self.engine_manager = EngineManager(
-            mode=mode,
-            cleanup_interval=cleanup_interval,
-        )
+        self.engine_manager = EngineManager(mode, cleanup_interval)
 
         # Start cleanup thread if requested and in SINGLETON mode
         if auto_start_cleanup and mode == EngineModes.SINGLETON:
@@ -83,8 +71,9 @@ class EngineManagerExtension:
         atexit.register(shutdown_engine_manager)
 
         logger.info(
-            f"Initialized EngineManager with mode={mode.name}, "
-            f"cleanup_interval={cleanup_interval}s"
+            "Initialized EngineManager with mode=%s, cleanup_interval=%ds",
+            mode,
+            cleanup_interval.total_seconds(),
         )
 
     @property
diff --git a/superset/models/core.py b/superset/models/core.py
index 18a44e66cd5..c6dcd9ad05b 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -136,9 +136,7 @@ class ConfigurationMethod(StrEnum):
     DYNAMIC_FORM = "dynamic_form"
 
 
-class Database(
-    CoreDatabase, AuditMixinNullable, ImportExportMixin
-):  # pylint: disable=too-many-public-methods
+class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin):  # 
pylint: disable=too-many-public-methods
     """An ORM object that stores Database related information"""
 
     __tablename__ = "dbs"
@@ -415,7 +413,9 @@ class Database(
         return (
             username
             if (username := get_username())
-            else object_url.username if self.impersonate_user else None
+            else object_url.username
+            if self.impersonate_user
+            else None
         )
 
     @contextmanager
@@ -431,9 +431,6 @@ class Database(
         This method will return a context manager for a SQLAlchemy engine. The 
engine
         manager handles connection pooling, SSH tunnels, and other connection 
details
         based on the configured mode (NEW or SINGLETON).
-
-        Note: The nullpool parameter is kept for backwards compatibility but 
is ignored.
-        Pool configuration is now read from the database's extra configuration.
         """
         # Import here to avoid circular imports
         from superset.extensions import engine_manager_extension
@@ -470,7 +467,6 @@ class Database(
         self,
         catalog: str | None = None,
         schema: str | None = None,
-        nullpool: bool = True,  # Kept for backwards compatibility, but ignored
         source: utils.QuerySource | None = None,
     ) -> Connection:
         with self.get_sqla_engine(

Reply via email to