This is an automated email from the ASF dual-hosted git repository.
rusackas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new fac5d2bcb6 feat(db): add dynamic schema support for athena (#36003)
fac5d2bcb6 is described below
commit fac5d2bcb6502d2bbc0af6c0a3b427fba460f869
Author: Igor Shmulyan <[email protected]>
AuthorDate: Mon Jan 12 20:58:12 2026 +0200
feat(db): add dynamic schema support for athena (#36003)
---
superset/db_engine_specs/athena.py | 40 ++++++++++++++
tests/unit_tests/db_engine_specs/test_athena.py | 70 +++++++++++++++++++++++++
2 files changed, 110 insertions(+)
diff --git a/superset/db_engine_specs/athena.py
b/superset/db_engine_specs/athena.py
index a3abfdf109..002790da0e 100644
--- a/superset/db_engine_specs/athena.py
+++ b/superset/db_engine_specs/athena.py
@@ -21,6 +21,7 @@ from typing import Any, Optional
from flask_babel import gettext as __
from sqlalchemy import types
+from sqlalchemy.engine.url import URL
from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec
@@ -38,6 +39,7 @@ class AthenaEngineSpec(BaseEngineSpec):
disable_ssh_tunneling = True
# Athena doesn't support IS true/false syntax, use = true/false instead
use_equality_for_boolean_filters = True
+ supports_dynamic_schema = True
_time_grain_expressions = {
None: "{col}",
@@ -92,3 +94,41 @@ class AthenaEngineSpec(BaseEngineSpec):
:return: Conditionally mutated label
"""
return label.lower()
+
+ @classmethod
+ def adjust_engine_params(
+ cls,
+ uri: URL,
+ connect_args: dict[str, Any],
+ catalog: str | None = None,
+ schema: str | None = None,
+ ) -> tuple[URL, dict[str, Any]]:
+ """
+ Adjust the SQLAlchemy URI for Athena with a provided catalog and
schema.
+
+ For AWS Athena the SQLAlchemy URI looks like this:
+
+
awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir}
+ """
+ if catalog:
+ uri = uri.update_query_dict({"catalog_name": catalog})
+
+ if schema:
+ uri = uri.set(database=schema)
+
+ return uri, connect_args
+
+ @classmethod
+ def get_schema_from_engine_params(
+ cls,
+ sqlalchemy_uri: URL,
+ connect_args: dict[str, Any],
+ ) -> str | None:
+ """
+ Return the configured schema.
+
+ For AWS Athena the SQLAlchemy URI looks like this:
+
+
awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir}
+ """
+ return sqlalchemy_uri.database
diff --git a/tests/unit_tests/db_engine_specs/test_athena.py
b/tests/unit_tests/db_engine_specs/test_athena.py
index 9e571eceb2..d205572d2c 100644
--- a/tests/unit_tests/db_engine_specs/test_athena.py
+++ b/tests/unit_tests/db_engine_specs/test_athena.py
@@ -20,6 +20,7 @@ from datetime import datetime
from typing import Optional
import pytest
+from sqlalchemy.engine.url import make_url
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from tests.unit_tests.db_engine_specs.utils import assert_convert_dttm
@@ -120,3 +121,72 @@ def test_handle_boolean_filter() -> None:
str(result_false.compile(compile_kwargs={"literal_binds": True}))
== "test_col = false"
)
+
+
+def test_adjust_engine_params() -> None:
+ """
+ Test `adjust_engine_params`.
+
+ The method can be used to adjust the schema dynamically.
+ """
+ from superset.db_engine_specs.athena import AthenaEngineSpec
+
+ url =
make_url("awsathena+rest://athena.us-east-1.amazonaws.com:443/default")
+
+ uri = AthenaEngineSpec.adjust_engine_params(url, {})[0]
+ assert str(uri) ==
"awsathena+rest://athena.us-east-1.amazonaws.com:443/default"
+
+ uri = AthenaEngineSpec.adjust_engine_params(
+ url,
+ {},
+ schema="new_schema",
+ )[0]
+ assert str(uri) ==
"awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema"
+
+ uri = AthenaEngineSpec.adjust_engine_params(
+ url,
+ {},
+ catalog="new_catalog",
+ )[0]
+ assert (
+ str(uri)
+ ==
"awsathena+rest://athena.us-east-1.amazonaws.com:443/default?catalog_name=new_catalog"
+ )
+
+ uri = AthenaEngineSpec.adjust_engine_params(
+ url,
+ {},
+ catalog="new_catalog",
+ schema="new_schema",
+ )[0]
+ assert (
+ str(uri)
+ ==
"awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema?catalog_name=new_catalog"
+ )
+
+
+def test_get_schema_from_engine_params() -> None:
+ """
+ Test the ``get_schema_from_engine_params`` method.
+ """
+ from superset.db_engine_specs.athena import AthenaEngineSpec
+
+ assert (
+ AthenaEngineSpec.get_schema_from_engine_params(
+ make_url(
+
"awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging"
+ ),
+ {},
+ )
+ == "default"
+ )
+
+ assert (
+ AthenaEngineSpec.get_schema_from_engine_params(
+ make_url(
+
"awsathena+rest://athena.us-east-1.amazonaws.com:443?s3_staging_dir=s3%3A%2F%2Fathena-staging"
+ ),
+ {},
+ )
+ is None
+ )