This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch engine-manager in repository https://gitbox.apache.org/repos/asf/superset.git
commit bb5a15dc5a60c7894a3b38d6bcdea5018ca9864f Author: Beto Dealmeida <[email protected]> AuthorDate: Wed Dec 3 14:57:02 2025 -0500 Cleanup --- superset/config.py | 5 +++-- superset/engines/manager.py | 31 +++++++++++++++++++------------ superset/extensions/engine_manager.py | 25 +++++++------------------ superset/models/core.py | 12 ++++-------- 4 files changed, 33 insertions(+), 40 deletions(-) diff --git a/superset/config.py b/superset/config.py index 3b0e3146197..ee4dc92feca 100644 --- a/superset/config.py +++ b/superset/config.py @@ -52,6 +52,7 @@ from superset.advanced_data_type.plugins.internet_address import internet_addres from superset.advanced_data_type.plugins.internet_port import internet_port from superset.advanced_data_type.types import AdvancedDataType from superset.constants import CHANGE_ME_SECRET_KEY +from superset.engines.manager import EngineModes from superset.jinja_context import BaseTemplateProcessor from superset.key_value.types import JsonKeyValueCodec from superset.stats_logger import DummyStatsLogger @@ -266,10 +267,10 @@ SQLALCHEMY_CUSTOM_PASSWORD_STORE = None # Engine manager mode: "NEW" creates a new engine for every connection (default), # "SINGLETON" reuses engines with connection pooling -ENGINE_MANAGER_MODE = "NEW" +ENGINE_MANAGER_MODE = EngineModes.NEW # Cleanup interval for abandoned locks in seconds (default: 5 minutes) -ENGINE_MANAGER_CLEANUP_INTERVAL = 300.0 +ENGINE_MANAGER_CLEANUP_INTERVAL = timedelta(minutes=5) # Automatically start cleanup thread for SINGLETON mode (default: True) ENGINE_MANAGER_AUTO_START_CLEANUP = True diff --git a/superset/engines/manager.py b/superset/engines/manager.py index 37be4641cb7..cbe855fd1fd 100644 --- a/superset/engines/manager.py +++ b/superset/engines/manager.py @@ -20,6 +20,7 @@ import logging import threading from collections import defaultdict from contextlib import contextmanager +from datetime import timedelta from io import StringIO from typing import Any, TYPE_CHECKING @@ -71,7 +72,7 @@ class EngineManager: def __init__( self, mode: EngineModes = EngineModes.NEW, - cleanup_interval: float = 300.0, # 5 minutes default + cleanup_interval: timedelta = timedelta(minutes=5), ) -> None: self.mode = mode self.cleanup_interval = cleanup_interval @@ -100,7 +101,7 @@ class EngineManager: except Exception as ex: # Avoid exceptions during garbage collection, but log if possible try: - logger.warning(f"Error stopping cleanup thread: {ex}") + logger.warning("Error stopping cleanup thread: %s", ex) except Exception: # noqa: S110 # If logging fails during destruction, we can't do anything pass @@ -212,6 +213,10 @@ class EngineManager: needed, since it needs to connect to the tunnel instead of the original DB. But that information is only available after the tunnel is created. """ + # Import here to avoid circular imports + from superset.extensions import security_manager + from superset.utils.feature_flag_manager import FeatureFlagManager + uri = make_url_safe(database.sqlalchemy_uri_decrypted) extra = database.get_extra(source) @@ -242,10 +247,6 @@ class EngineManager: # get effective username username = database.get_effective_user(uri) - # Import here to avoid circular imports - from superset.extensions import security_manager - from superset.utils.feature_flag_manager import FeatureFlagManager - feature_flag_manager = FeatureFlagManager() if username and feature_flag_manager.is_feature_enabled( "IMPERSONATE_WITH_EMAIL_PREFIX" @@ -322,7 +323,6 @@ class EngineManager: user_id, ) - tunnel = None if database.ssh_tunnel: tunnel = self._get_tunnel(database.ssh_tunnel, uri) uri = uri.set( @@ -444,7 +444,8 @@ class EngineManager: ) self._cleanup_thread.start() logger.info( - f"Started cleanup thread with {self.cleanup_interval}s interval" + "Started cleanup thread with %ds interval", + self.cleanup_interval.total_seconds(), ) def stop_cleanup_thread(self) -> None: @@ -475,7 +476,9 @@ class EngineManager: logger.exception("Error during background cleanup") # Use wait() instead of sleep() to allow for immediate shutdown - if self._cleanup_stop_event.wait(timeout=self.cleanup_interval): + if self._cleanup_stop_event.wait( + timeout=self.cleanup_interval.total_seconds() + ): break # Stop event was set def cleanup(self) -> None: @@ -502,7 +505,8 @@ class EngineManager: if abandoned_engine_locks: logger.debug( - f"Cleaned up {len(abandoned_engine_locks)} abandoned engine locks" + "Cleaned up %d abandoned engine locks", + len(abandoned_engine_locks), ) # Clean up tunnel locks @@ -513,7 +517,8 @@ class EngineManager: if abandoned_tunnel_locks: logger.debug( - f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel locks" + "Cleaned up %d abandoned tunnel locks", + len(abandoned_tunnel_locks), ) def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> None: @@ -522,7 +527,9 @@ class EngineManager: try: # `pop` is atomic -- no lock needed if self._engines.pop(engine_key, None): - logger.info(f"Engine disposed and removed from cache: {engine_key}") + logger.info( + "Engine disposed and removed from cache: %s", engine_key + ) self._engine_locks.pop(engine_key, None) except Exception as ex: logger.error( diff --git a/superset/extensions/engine_manager.py b/superset/extensions/engine_manager.py index 5c07d1cdd9e..c341364e2a5 100644 --- a/superset/extensions/engine_manager.py +++ b/superset/extensions/engine_manager.py @@ -45,24 +45,12 @@ class EngineManagerExtension: Initialize the EngineManager with Flask app configuration. """ # Get configuration values with defaults - mode_name = app.config.get("ENGINE_MANAGER_MODE", "NEW") - cleanup_interval = app.config.get("ENGINE_MANAGER_CLEANUP_INTERVAL", 300.0) - auto_start_cleanup = app.config.get("ENGINE_MANAGER_AUTO_START_CLEANUP", True) - - # Convert mode string to enum - try: - mode = EngineModes[mode_name.upper()] - except KeyError: - logger.warning( - f"Invalid ENGINE_MANAGER_MODE '{mode_name}', defaulting to NEW" - ) - mode = EngineModes.NEW + mode = app.config["ENGINE_MANAGER_MODE"] + cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] + auto_start_cleanup = app.config["ENGINE_MANAGER_AUTO_START_CLEANUP"] # Create the engine manager - self.engine_manager = EngineManager( - mode=mode, - cleanup_interval=cleanup_interval, - ) + self.engine_manager = EngineManager(mode, cleanup_interval) # Start cleanup thread if requested and in SINGLETON mode if auto_start_cleanup and mode == EngineModes.SINGLETON: @@ -83,8 +71,9 @@ class EngineManagerExtension: atexit.register(shutdown_engine_manager) logger.info( - f"Initialized EngineManager with mode={mode.name}, " - f"cleanup_interval={cleanup_interval}s" + "Initialized EngineManager with mode=%s, cleanup_interval=%ds", + mode, + cleanup_interval.total_seconds(), ) @property diff --git a/superset/models/core.py b/superset/models/core.py index 18a44e66cd5..c6dcd9ad05b 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -136,9 +136,7 @@ class ConfigurationMethod(StrEnum): DYNAMIC_FORM = "dynamic_form" -class Database( - CoreDatabase, AuditMixinNullable, ImportExportMixin -): # pylint: disable=too-many-public-methods +class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin): # pylint: disable=too-many-public-methods """An ORM object that stores Database related information""" __tablename__ = "dbs" @@ -415,7 +413,9 @@ class Database( return ( username if (username := get_username()) - else object_url.username if self.impersonate_user else None + else object_url.username + if self.impersonate_user + else None ) @contextmanager @@ -431,9 +431,6 @@ class Database( This method will return a context manager for a SQLAlchemy engine. The engine manager handles connection pooling, SSH tunnels, and other connection details based on the configured mode (NEW or SINGLETON). - - Note: The nullpool parameter is kept for backwards compatibility but is ignored. - Pool configuration is now read from the database's extra configuration. """ # Import here to avoid circular imports from superset.extensions import engine_manager_extension @@ -470,7 +467,6 @@ class Database( self, catalog: str | None = None, schema: str | None = None, - nullpool: bool = True, # Kept for backwards compatibility, but ignored source: utils.QuerySource | None = None, ) -> Connection: with self.get_sqla_engine(
