This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch engine-manager in repository https://gitbox.apache.org/repos/asf/superset.git
commit 929b0337f483b93af8ac94d2d41969c420e29960 Author: Beto Dealmeida <[email protected]> AuthorDate: Tue Jul 29 20:28:33 2025 -0400 Connecting --- superset/engines/manager.py | 7 ++- superset/models/core.py | 145 +++++++------------------------------------- 2 files changed, 25 insertions(+), 127 deletions(-) diff --git a/superset/engines/manager.py b/superset/engines/manager.py index 9f2b41caad6..37be4641cb7 100644 --- a/superset/engines/manager.py +++ b/superset/engines/manager.py @@ -62,9 +62,10 @@ class EngineManager: This class handles the creation and management of SQLAlchemy engines, allowing them to be configured with connection pools and reused across requests. The default mode - is the default behavior for Superset, where we create a new engine for every - connection, using a NullPool. The `SINGLETON` mode allows for reusing of the - engines, as well as configuring the pool through the database settings. + is the original behavior for Superset, where we create a new engine for every + connection, using a NullPool. The `SINGLETON` mode, on the other hand, allows for + reusing of the engines, as well as configuring the pool through the database + settings. """ def __init__( diff --git a/superset/models/core.py b/superset/models/core.py index b9b7b605911..18a44e66cd5 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -25,7 +25,7 @@ import builtins import logging import textwrap from ast import literal_eval -from contextlib import closing, contextmanager, nullcontext, suppress +from contextlib import closing, contextmanager, suppress from copy import deepcopy from datetime import datetime from functools import lru_cache @@ -35,14 +35,12 @@ from typing import Any, Callable, cast, Optional, TYPE_CHECKING import numpy import pandas as pd import sqlalchemy as sqla -import sshtunnel from flask import current_app as app, g, has_app_context from flask_appbuilder import Model from marshmallow.exceptions import ValidationError from sqlalchemy import ( Boolean, Column, - create_engine, DateTime, ForeignKey, Integer, @@ -57,7 +55,6 @@ from sqlalchemy.engine.url import URL from sqlalchemy.exc import NoSuchModuleError from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship -from sqlalchemy.pool import NullPool from sqlalchemy.schema import UniqueConstraint from sqlalchemy.sql import ColumnElement, expression, Select from superset_core.api.models import Database as CoreDatabase @@ -72,7 +69,6 @@ from superset.extensions import ( encrypted_field_factory, event_logger, security_manager, - ssh_manager_factory, ) from superset.models.helpers import AuditMixinNullable, ImportExportMixin, UUIDMixin from superset.result_set import SupersetResultSet @@ -84,10 +80,9 @@ from superset.superset_typing import ( ) from superset.utils import cache as cache_util, core as utils, json from superset.utils.backports import StrEnum -from superset.utils.core import get_query_source_from_request, get_username +from superset.utils.core import get_username from superset.utils.oauth2 import ( check_for_oauth2, - get_oauth2_access_token, OAuth2ClientConfigSchema, ) @@ -141,7 +136,9 @@ class ConfigurationMethod(StrEnum): DYNAMIC_FORM = "dynamic_form" -class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin): # pylint: disable=too-many-public-methods +class Database( + CoreDatabase, AuditMixinNullable, ImportExportMixin +): # pylint: disable=too-many-public-methods """An ORM object that stores Database related information""" __tablename__ = "dbs" @@ -418,137 +415,38 @@ class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin): # pylint: return ( username if (username := get_username()) - else object_url.username - if self.impersonate_user - else None + else object_url.username if self.impersonate_user else None ) @contextmanager - def get_sqla_engine( # pylint: disable=too-many-arguments + def get_sqla_engine( self, catalog: str | None = None, schema: str | None = None, - nullpool: bool = True, source: utils.QuerySource | None = None, ) -> Engine: """ Context manager for a SQLAlchemy engine. - This method will return a context manager for a SQLAlchemy engine. Using the - context manager (as opposed to the engine directly) is important because we need - to potentially establish SSH tunnels before the connection is created, and clean - them up once the engine is no longer used. - """ - - sqlalchemy_uri = self.sqlalchemy_uri_decrypted - - ssh_context_manager = ( - ssh_manager_factory.instance.create_tunnel( - ssh_tunnel=self.ssh_tunnel, - sqlalchemy_database_uri=sqlalchemy_uri, - ) - if self.ssh_tunnel - else nullcontext() - ) - - with ssh_context_manager as ssh_context: - if ssh_context: - logger.info( - "[SSH] Successfully created tunnel w/ %s tunnel_timeout + %s " - "ssh_timeout at %s", - sshtunnel.TUNNEL_TIMEOUT, - sshtunnel.SSH_TIMEOUT, - ssh_context.local_bind_address, - ) - sqlalchemy_uri = ssh_manager_factory.instance.build_sqla_url( - sqlalchemy_uri, - ssh_context, - ) + This method will return a context manager for a SQLAlchemy engine. The engine + manager handles connection pooling, SSH tunnels, and other connection details + based on the configured mode (NEW or SINGLETON). - engine_context_manager = app.config["ENGINE_CONTEXT_MANAGER"] - with engine_context_manager(self, catalog, schema): - with check_for_oauth2(self): - yield self._get_sqla_engine( - catalog=catalog, - schema=schema, - nullpool=nullpool, - source=source, - sqlalchemy_uri=sqlalchemy_uri, - ) + Note: The nullpool parameter is kept for backwards compatibility but is ignored. + Pool configuration is now read from the database's extra configuration. + """ + # Import here to avoid circular imports + from superset.extensions import engine_manager_extension - def _get_sqla_engine( # pylint: disable=too-many-locals # noqa: C901 - self, - catalog: str | None = None, - schema: str | None = None, - nullpool: bool = True, - source: utils.QuerySource | None = None, - sqlalchemy_uri: str | None = None, - ) -> Engine: - sqlalchemy_url = make_url_safe( - sqlalchemy_uri if sqlalchemy_uri else self.sqlalchemy_uri_decrypted - ) - self.db_engine_spec.validate_database_uri(sqlalchemy_url) - - extra = self.get_extra(source) - engine_kwargs = extra.get("engine_params", {}) - if nullpool: - engine_kwargs["poolclass"] = NullPool - connect_args = engine_kwargs.setdefault("connect_args", {}) - - # modify URL/args for a specific catalog/schema - sqlalchemy_url, connect_args = self.db_engine_spec.adjust_engine_params( - uri=sqlalchemy_url, - connect_args=connect_args, + # Use the engine manager to get the engine + engine_manager = engine_manager_extension.manager + return engine_manager.get_engine( + database=self, catalog=catalog, schema=schema, + source=source, ) - effective_username = self.get_effective_user(sqlalchemy_url) - if effective_username and is_feature_enabled("IMPERSONATE_WITH_EMAIL_PREFIX"): - user = security_manager.find_user(username=effective_username) - if user and user.email: - effective_username = user.email.split("@")[0] - - oauth2_config = self.get_oauth2_config() - access_token = ( - get_oauth2_access_token( - oauth2_config, - self.id, - g.user.id, - self.db_engine_spec, - ) - if oauth2_config and hasattr(g, "user") and hasattr(g.user, "id") - else None - ) - masked_url = self.get_password_masked_url(sqlalchemy_url) - logger.debug("Database._get_sqla_engine(). Masked URL: %s", str(masked_url)) - - if self.impersonate_user: - sqlalchemy_url, engine_kwargs = self.db_engine_spec.impersonate_user( - self, - effective_username, - access_token, - sqlalchemy_url, - engine_kwargs, - ) - - self.update_params_from_encrypted_extra(engine_kwargs) - - if DB_CONNECTION_MUTATOR := app.config["DB_CONNECTION_MUTATOR"]: # noqa: N806 - source = source or get_query_source_from_request() - - sqlalchemy_url, engine_kwargs = DB_CONNECTION_MUTATOR( - sqlalchemy_url, - engine_kwargs, - effective_username, - security_manager, - source, - ) - try: - return create_engine(sqlalchemy_url, **engine_kwargs) - except Exception as ex: - raise self.db_engine_spec.get_dbapi_mapped_exception(ex) from ex - def add_database_to_signature( self, func: Callable[..., None], @@ -572,13 +470,12 @@ class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin): # pylint: self, catalog: str | None = None, schema: str | None = None, - nullpool: bool = True, + nullpool: bool = True, # Kept for backwards compatibility, but ignored source: utils.QuerySource | None = None, ) -> Connection: with self.get_sqla_engine( catalog=catalog, schema=schema, - nullpool=nullpool, source=source, ) as engine: with check_for_oauth2(self):
