This is an automated email from the ASF dual-hosted git repository.
elizabeth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 4147d877fc1 fix(mcp): prevent DATE_TRUNC on non-temporal columns in
chart generation (#37433)
4147d877fc1 is described below
commit 4147d877fc1b8f48e23f26685f112b91d4d261d1
Author: Amin Ghadersohi <[email protected]>
AuthorDate: Thu Feb 5 10:24:31 2026 -0700
fix(mcp): prevent DATE_TRUNC on non-temporal columns in chart generation
(#37433)
---
superset/mcp_service/chart/chart_utils.py | 157 ++++++++++--
superset/mcp_service/chart/tool/generate_chart.py | 5 +-
superset/mcp_service/chart/tool/update_chart.py | 4 +-
.../mcp_service/chart/tool/update_chart_preview.py | 5 +-
.../explore/tool/generate_explore_link.py | 5 +-
.../mcp_service/chart/test_chart_utils.py | 281 ++++++++++++++++++++-
.../explore/tool/test_generate_explore_link.py | 23 +-
7 files changed, 449 insertions(+), 31 deletions(-)
diff --git a/superset/mcp_service/chart/chart_utils.py
b/superset/mcp_service/chart/chart_utils.py
index 4597ac55b50..748e770a4d8 100644
--- a/superset/mcp_service/chart/chart_utils.py
+++ b/superset/mcp_service/chart/chart_utils.py
@@ -22,6 +22,7 @@ This module contains shared logic for chart configuration
mapping and explore li
generation that can be used by both generate_chart and generate_explore_link
tools.
"""
+import logging
from typing import Any, Dict
from superset.mcp_service.chart.schemas import (
@@ -34,24 +35,26 @@ from superset.mcp_service.chart.schemas import (
from superset.mcp_service.utils.url_utils import get_superset_base_url
from superset.utils import json
+logger = logging.getLogger(__name__)
+
def generate_explore_link(dataset_id: int | str, form_data: Dict[str, Any]) ->
str:
"""Generate an explore link for the given dataset and form data."""
+ from sqlalchemy.exc import SQLAlchemyError
+
+ from superset.commands.exceptions import CommandException
+ from superset.commands.explore.form_data.parameters import
CommandParameters
+ from superset.daos.dataset import DatasetDAO
+ from superset.mcp_service.commands.create_form_data import (
+ MCPCreateFormDataCommand,
+ )
+ from superset.utils.core import DatasourceType
+
base_url = get_superset_base_url()
numeric_dataset_id = None
+ dataset = None
try:
- from superset.commands.explore.form_data.parameters import
CommandParameters
-
- # Find the dataset to get its numeric ID
- from superset.daos.dataset import DatasetDAO
- from superset.mcp_service.commands.create_form_data import (
- MCPCreateFormDataCommand,
- )
- from superset.utils.core import DatasourceType
-
- dataset = None
-
if isinstance(dataset_id, int) or (
isinstance(dataset_id, str) and dataset_id.isdigit()
):
@@ -92,27 +95,102 @@ def generate_explore_link(dataset_id: int | str,
form_data: Dict[str, Any]) -> s
# Return URL with just the form_data_key
return f"{base_url}/explore/?form_data_key={form_data_key}"
- except Exception:
+ except (CommandException, SQLAlchemyError, ValueError, AttributeError):
# Fallback to basic explore URL with numeric ID if available
if numeric_dataset_id is not None:
return (
f"{base_url}/explore/?datasource_type=table"
f"&datasource_id={numeric_dataset_id}"
)
+ return
f"{base_url}/explore/?datasource_type=table&datasource_id={dataset_id}"
+
+
+def is_column_truly_temporal(column_name: str, dataset_id: int | str | None)
-> bool:
+ """
+ Check if a column is truly temporal based on its SQL data type.
+
+ This is important because Superset may mark columns as is_dttm=True based
on
+ column name heuristics (e.g., "year", "month"), but if the actual SQL type
is
+ BIGINT or INTEGER, DATE_TRUNC will fail.
+
+ Uses the database engine spec's column type mapping to determine the actual
+ GenericDataType, bypassing the is_dttm flag which may be set incorrectly.
+
+ Args:
+ column_name: Name of the column to check
+ dataset_id: Dataset ID to look up column metadata
+
+ Returns:
+ True if the column has a real temporal SQL type, False otherwise
+ """
+ from superset.daos.dataset import DatasetDAO
+ from superset.utils.core import GenericDataType
+
+ if not dataset_id:
+ return True # Default to temporal if we can't check (backward
compatible)
+
+ try:
+ # Find dataset
+ if isinstance(dataset_id, int) or (
+ isinstance(dataset_id, str) and dataset_id.isdigit()
+ ):
+ dataset = DatasetDAO.find_by_id(int(dataset_id))
else:
- return (
-
f"{base_url}/explore/?datasource_type=table&datasource_id={dataset_id}"
- )
+ dataset = DatasetDAO.find_by_id(dataset_id, id_column="uuid")
+
+ if not dataset:
+ return True # Default to temporal if dataset not found
+
+ # Find the column and check its actual type using db_engine_spec
+ column_lower = column_name.lower()
+ for col in dataset.columns:
+ if col.column_name.lower() == column_lower:
+ col_type = col.type
+ if not col_type:
+ # No type info, trust is_dttm flag
+ return getattr(col, "is_dttm", False)
+
+ # Use the db_engine_spec to get the actual GenericDataType
+ # This bypasses the is_dttm flag and checks the real SQL type
+ db_engine_spec = dataset.database.db_engine_spec
+ column_spec = db_engine_spec.get_column_spec(col_type)
+
+ if column_spec:
+ is_temporal = column_spec.generic_type ==
GenericDataType.TEMPORAL
+ if not is_temporal:
+ logger.debug(
+ "Column '%s' has type '%s' (generic: %s), "
+ "treating as non-temporal",
+ column_name,
+ col_type,
+ column_spec.generic_type,
+ )
+ return is_temporal
+
+ # If no column_spec, trust is_dttm flag
+ return getattr(col, "is_dttm", False)
+
+ return True # Default if column not found
+
+ except (ValueError, AttributeError) as e:
+ logger.warning(
+ "Error checking column type for '%s' in dataset %s: %s",
+ column_name,
+ dataset_id,
+ e,
+ )
+ return True # Default to temporal on error (backward compatible)
def map_config_to_form_data(
config: TableChartConfig | XYChartConfig,
+ dataset_id: int | str | None = None,
) -> Dict[str, Any]:
"""Map chart config to Superset form_data."""
if isinstance(config, TableChartConfig):
return map_table_config(config)
elif isinstance(config, XYChartConfig):
- return map_xy_config(config)
+ return map_xy_config(config, dataset_id=dataset_id)
else:
raise ValueError(f"Unsupported config type: {type(config)}")
@@ -259,13 +337,41 @@ def add_legend_config(form_data: Dict[str, Any], config:
XYChartConfig) -> None:
form_data["legend_orientation"] = config.legend.position
-def map_xy_config(config: XYChartConfig) -> Dict[str, Any]:
+def configure_temporal_handling(
+ form_data: Dict[str, Any],
+ x_is_temporal: bool,
+ time_grain: str | None,
+) -> None:
+ """Configure form_data based on whether x-axis column is temporal.
+
+ For temporal columns, enables standard time series handling.
+ For non-temporal columns (e.g., BIGINT year), disables DATE_TRUNC
+ by setting categorical sorting options.
+ """
+ if x_is_temporal:
+ if time_grain:
+ form_data["time_grain_sqla"] = time_grain
+ else:
+ # Non-temporal column - disable temporal handling to prevent DATE_TRUNC
+ form_data["x_axis_sort_series_type"] = "name"
+ form_data["x_axis_sort_series_ascending"] = True
+ form_data["time_grain_sqla"] = None
+ form_data["granularity_sqla"] = None
+
+
+def map_xy_config(
+ config: XYChartConfig, dataset_id: int | str | None = None
+) -> Dict[str, Any]:
"""Map XY chart config to form_data with defensive validation."""
# Early validation to prevent empty charts
if not config.y:
raise ValueError("XY chart must have at least one Y-axis metric")
- # Map chart kind to viz_type
+ # Check if x-axis column is truly temporal (based on actual SQL type)
+ x_is_temporal = is_column_truly_temporal(config.x.name, dataset_id)
+
+ # Map chart kind to viz_type - always use the same viz types
+ # The temporal vs non-temporal handling is done via form_data configuration
viz_type_map = {
"line": "echarts_timeseries_line",
"bar": "echarts_timeseries_bar",
@@ -273,6 +379,14 @@ def map_xy_config(config: XYChartConfig) -> Dict[str, Any]:
"scatter": "echarts_timeseries_scatter",
}
+ if not x_is_temporal:
+ logger.info(
+ "X-axis column '%s' is not temporal (dataset_id=%s), "
+ "configuring as categorical dimension",
+ config.x.name,
+ dataset_id,
+ )
+
# Convert Y columns to metrics with validation
metrics = []
for col in config.y:
@@ -286,13 +400,12 @@ def map_xy_config(config: XYChartConfig) -> Dict[str,
Any]:
form_data: Dict[str, Any] = {
"viz_type": viz_type_map.get(config.kind, "echarts_timeseries_line"),
- "x_axis": config.x.name,
"metrics": metrics,
+ "x_axis": config.x.name,
}
- # Add time grain if specified (for temporal x-axis columns)
- if config.time_grain:
- form_data["time_grain_sqla"] = config.time_grain
+ # Configure temporal handling based on whether column is truly temporal
+ configure_temporal_handling(form_data, x_is_temporal, config.time_grain)
# CRITICAL FIX: For time series charts, handle groupby carefully to avoid
duplicates
# The x_axis field already tells Superset which column to use for time
grouping
diff --git a/superset/mcp_service/chart/tool/generate_chart.py
b/superset/mcp_service/chart/tool/generate_chart.py
index bb8157adf4d..23eb23c01e5 100644
--- a/superset/mcp_service/chart/tool/generate_chart.py
+++ b/superset/mcp_service/chart/tool/generate_chart.py
@@ -174,7 +174,10 @@ async def generate_chart( # noqa: C901
)
# Map the simplified config to Superset's form_data format
- form_data = map_config_to_form_data(request.config)
+ # Pass dataset_id to enable column type checking for proper viz_type
selection
+ form_data = map_config_to_form_data(
+ request.config, dataset_id=request.dataset_id
+ )
chart = None
chart_id = None
diff --git a/superset/mcp_service/chart/tool/update_chart.py
b/superset/mcp_service/chart/tool/update_chart.py
index ed5355906ff..a1b720f380d 100644
--- a/superset/mcp_service/chart/tool/update_chart.py
+++ b/superset/mcp_service/chart/tool/update_chart.py
@@ -125,7 +125,9 @@ async def update_chart(
)
# Map the new config to form_data format
- new_form_data = map_config_to_form_data(request.config)
+ # Get dataset_id from existing chart for column type checking
+ dataset_id = chart.datasource_id if chart.datasource_id else None
+ new_form_data = map_config_to_form_data(request.config,
dataset_id=dataset_id)
# Update chart using Superset's command
from superset.commands.chart.update import UpdateChartCommand
diff --git a/superset/mcp_service/chart/tool/update_chart_preview.py
b/superset/mcp_service/chart/tool/update_chart_preview.py
index 271709e230f..16893fcfbd0 100644
--- a/superset/mcp_service/chart/tool/update_chart_preview.py
+++ b/superset/mcp_service/chart/tool/update_chart_preview.py
@@ -66,7 +66,10 @@ def update_chart_preview(
try:
# Map the new config to form_data format
- new_form_data = map_config_to_form_data(request.config)
+ # Pass dataset_id to enable column type checking for proper viz_type
selection
+ new_form_data = map_config_to_form_data(
+ request.config, dataset_id=request.dataset_id
+ )
# Generate new explore link with updated form_data
explore_url = generate_explore_link(request.dataset_id, new_form_data)
diff --git a/superset/mcp_service/explore/tool/generate_explore_link.py
b/superset/mcp_service/explore/tool/generate_explore_link.py
index a2deefbb272..d1f07daf929 100644
--- a/superset/mcp_service/explore/tool/generate_explore_link.py
+++ b/superset/mcp_service/explore/tool/generate_explore_link.py
@@ -92,7 +92,10 @@ async def generate_explore_link(
try:
await ctx.report_progress(1, 3, "Converting configuration to form
data")
# Map config to form_data using shared utilities
- form_data = map_config_to_form_data(request.config)
+ # Pass dataset_id to enable column type checking for proper viz_type
selection
+ form_data = map_config_to_form_data(
+ request.config, dataset_id=request.dataset_id
+ )
# Add datasource to form_data for consistency with generate_chart
# Only set if not already present to avoid overwriting
diff --git a/tests/unit_tests/mcp_service/chart/test_chart_utils.py
b/tests/unit_tests/mcp_service/chart/test_chart_utils.py
index f2e9d966c89..eab027c3dca 100644
--- a/tests/unit_tests/mcp_service/chart/test_chart_utils.py
+++ b/tests/unit_tests/mcp_service/chart/test_chart_utils.py
@@ -17,14 +17,17 @@
"""Tests for chart utilities module"""
+from typing import Any
from unittest.mock import patch
import pytest
from superset.mcp_service.chart.chart_utils import (
+ configure_temporal_handling,
create_metric_object,
generate_chart_name,
generate_explore_link,
+ is_column_truly_temporal,
map_config_to_form_data,
map_filter_operator,
map_table_config,
@@ -38,6 +41,7 @@ from superset.mcp_service.chart.schemas import (
TableChartConfig,
XYChartConfig,
)
+from superset.utils.core import GenericDataType
class TestCreateMetricObject:
@@ -423,7 +427,9 @@ class TestGenerateExploreLink:
mock_get_base_url.return_value = "https://superset.example.com"
form_data = {"viz_type": "table", "metrics": ["count"]}
- result = generate_explore_link("123", form_data)
+ # Mock dataset not found to trigger fallback URL
+ with patch("superset.daos.dataset.DatasetDAO.find_by_id",
return_value=None):
+ result = generate_explore_link("123", form_data)
# Should use the configured base URL - use urlparse to avoid CodeQL
warning
parsed_url = urlparse(result)
@@ -471,13 +477,15 @@ class TestGenerateExploreLink:
@patch("superset.mcp_service.chart.chart_utils.get_superset_base_url")
def test_generate_explore_link_exception_handling(self, mock_get_base_url)
-> None:
- """Test generate_explore_link handles exceptions gracefully"""
+ """Test generate_explore_link handles SQLAlchemy exceptions
gracefully"""
+ from sqlalchemy.exc import SQLAlchemyError
+
mock_get_base_url.return_value = "http://localhost:9001"
- # Mock exception during form_data creation
+ # Mock SQLAlchemy exception during dataset lookup
with patch(
"superset.daos.dataset.DatasetDAO.find_by_id",
- side_effect=Exception("DB error"),
+ side_effect=SQLAlchemyError("DB error"),
):
result = generate_explore_link("123", {"viz_type": "table"})
@@ -605,3 +613,268 @@ class TestCriticalBugFixes:
form_data = map_xy_config(config)
assert form_data["viz_type"] == expected_viz_type
+
+
+class TestIsColumnTrulyTemporal:
+ """Test is_column_truly_temporal function using db_engine_spec"""
+
+ def _create_mock_dataset(
+ self,
+ column_name: str,
+ column_type: str,
+ generic_type: GenericDataType,
+ ):
+ """Helper to create a mock dataset with proper db_engine_spec"""
+ from unittest.mock import MagicMock
+
+ from superset.utils.core import ColumnSpec
+
+ mock_column = MagicMock()
+ mock_column.column_name = column_name
+ mock_column.type = column_type
+
+ mock_db_engine_spec = MagicMock()
+ mock_column_spec = ColumnSpec(
+ sqla_type=MagicMock(), generic_type=generic_type, is_dttm=False
+ )
+ mock_db_engine_spec.get_column_spec.return_value = mock_column_spec
+
+ mock_database = MagicMock()
+ mock_database.db_engine_spec = mock_db_engine_spec
+
+ mock_dataset = MagicMock()
+ mock_dataset.columns = [mock_column]
+ mock_dataset.database = mock_database
+
+ return mock_dataset
+
+ def test_returns_true_when_no_dataset_id(self) -> None:
+ """Test returns True (default) when dataset_id is None"""
+ result = is_column_truly_temporal("year", None)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_true_when_dataset_not_found(self, mock_dao) -> None:
+ """Test returns True when dataset is not found"""
+ mock_dao.find_by_id.return_value = None
+ result = is_column_truly_temporal("year", 123)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_false_for_numeric_column(self, mock_dao) -> None:
+ """Test returns False for NUMERIC generic type (e.g., BIGINT)"""
+ mock_dataset = self._create_mock_dataset(
+ "year", "BIGINT", GenericDataType.NUMERIC
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("year", 123)
+ assert result is False
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_false_for_integer_column(self, mock_dao) -> None:
+ """Test returns False for INTEGER column (NUMERIC generic type)"""
+ mock_dataset = self._create_mock_dataset(
+ "month", "INTEGER", GenericDataType.NUMERIC
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("month", 123)
+ assert result is False
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_true_for_temporal_column(self, mock_dao) -> None:
+ """Test returns True for TEMPORAL generic type (e.g., TIMESTAMP)"""
+ mock_dataset = self._create_mock_dataset(
+ "created_at", "TIMESTAMP", GenericDataType.TEMPORAL
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("created_at", 123)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_true_for_date_column(self, mock_dao) -> None:
+ """Test returns True for DATE column (TEMPORAL generic type)"""
+ mock_dataset = self._create_mock_dataset(
+ "order_date", "DATE", GenericDataType.TEMPORAL
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("order_date", 123)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_case_insensitive_column_name_lookup(self, mock_dao) -> None:
+ """Test column name lookup is case insensitive"""
+ mock_dataset = self._create_mock_dataset(
+ "Year", "BIGINT", GenericDataType.NUMERIC
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("year", 123)
+ assert result is False
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_true_on_value_error(self, mock_dao) -> None:
+ """Test returns True (default) when ValueError occurs"""
+ mock_dao.find_by_id.side_effect = ValueError("Invalid ID")
+ result = is_column_truly_temporal("year", 123)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_returns_true_on_attribute_error(self, mock_dao) -> None:
+ """Test returns True (default) when AttributeError occurs"""
+ mock_dao.find_by_id.side_effect = AttributeError("Missing attribute")
+ result = is_column_truly_temporal("year", 123)
+ assert result is True
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_handles_uuid_dataset_id(self, mock_dao) -> None:
+ """Test handles UUID string as dataset_id"""
+ mock_dataset = self._create_mock_dataset(
+ "year", "BIGINT", GenericDataType.NUMERIC
+ )
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("year", "abc-123-uuid")
+ assert result is False
+ mock_dao.find_by_id.assert_called_with("abc-123-uuid",
id_column="uuid")
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_falls_back_to_is_dttm_when_no_column_spec(self, mock_dao) -> None:
+ """Test falls back to is_dttm flag when get_column_spec returns None"""
+ from unittest.mock import MagicMock
+
+ mock_column = MagicMock()
+ mock_column.column_name = "year"
+ mock_column.type = "UNKNOWN_TYPE"
+ mock_column.is_dttm = False
+
+ mock_db_engine_spec = MagicMock()
+ mock_db_engine_spec.get_column_spec.return_value = None
+
+ mock_database = MagicMock()
+ mock_database.db_engine_spec = mock_db_engine_spec
+
+ mock_dataset = MagicMock()
+ mock_dataset.columns = [mock_column]
+ mock_dataset.database = mock_database
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("year", 123)
+ assert result is False
+
+ @patch("superset.daos.dataset.DatasetDAO")
+ def test_falls_back_to_is_dttm_when_no_type(self, mock_dao) -> None:
+ """Test falls back to is_dttm flag when column has no type"""
+ from unittest.mock import MagicMock
+
+ mock_column = MagicMock()
+ mock_column.column_name = "year"
+ mock_column.type = None
+ mock_column.is_dttm = True
+
+ mock_dataset = MagicMock()
+ mock_dataset.columns = [mock_column]
+ mock_dao.find_by_id.return_value = mock_dataset
+
+ result = is_column_truly_temporal("year", 123)
+ assert result is True
+
+
+class TestConfigureTemporalHandling:
+ """Test configure_temporal_handling function"""
+
+ def test_temporal_column_with_time_grain(self) -> None:
+ """Test temporal column sets time_grain_sqla"""
+ form_data: dict[str, Any] = {}
+ configure_temporal_handling(form_data, x_is_temporal=True,
time_grain="P1M")
+ assert form_data["time_grain_sqla"] == "P1M"
+
+ def test_temporal_column_without_time_grain(self) -> None:
+ """Test temporal column without time_grain doesn't set
time_grain_sqla"""
+ form_data: dict[str, Any] = {}
+ configure_temporal_handling(form_data, x_is_temporal=True,
time_grain=None)
+ assert "time_grain_sqla" not in form_data
+
+ def test_non_temporal_column_sets_categorical_config(self) -> None:
+ """Test non-temporal column sets categorical configuration"""
+ form_data: dict[str, Any] = {}
+ configure_temporal_handling(form_data, x_is_temporal=False,
time_grain=None)
+
+ assert form_data["x_axis_sort_series_type"] == "name"
+ assert form_data["x_axis_sort_series_ascending"] is True
+ assert form_data["time_grain_sqla"] is None
+ assert form_data["granularity_sqla"] is None
+
+ def test_non_temporal_column_ignores_time_grain(self) -> None:
+ """Test non-temporal column ignores time_grain parameter"""
+ form_data: dict[str, Any] = {}
+ configure_temporal_handling(form_data, x_is_temporal=False,
time_grain="P1M")
+
+ # Should still set categorical config, not time_grain
+ assert form_data["time_grain_sqla"] is None
+ assert form_data["x_axis_sort_series_type"] == "name"
+
+
+class TestMapXYConfigWithNonTemporalColumn:
+ """Test map_xy_config with non-temporal columns (DATE_TRUNC fix)"""
+
+ @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+ def test_non_temporal_column_disables_time_grain(self, mock_is_temporal)
-> None:
+ """Test non-temporal column sets categorical config"""
+ mock_is_temporal.return_value = False
+
+ config = XYChartConfig(
+ chart_type="xy",
+ x=ColumnRef(name="year"),
+ y=[ColumnRef(name="sales", aggregate="SUM")],
+ kind="bar",
+ )
+
+ result = map_xy_config(config, dataset_id=123)
+
+ assert result["x_axis"] == "year"
+ assert result["x_axis_sort_series_type"] == "name"
+ assert result["x_axis_sort_series_ascending"] is True
+ assert result["time_grain_sqla"] is None
+ assert result["granularity_sqla"] is None
+
+ @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+ def test_temporal_column_allows_time_grain(self, mock_is_temporal) -> None:
+ """Test temporal column allows time_grain to be set"""
+ mock_is_temporal.return_value = True
+
+ config = XYChartConfig(
+ chart_type="xy",
+ x=ColumnRef(name="created_at"),
+ y=[ColumnRef(name="count", aggregate="COUNT")],
+ kind="line",
+ time_grain="P1W",
+ )
+
+ result = map_xy_config(config, dataset_id=123)
+
+ assert result["x_axis"] == "created_at"
+ assert result["time_grain_sqla"] == "P1W"
+ assert "x_axis_sort_series_type" not in result
+
+ @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+ def test_non_temporal_ignores_time_grain_param(self, mock_is_temporal) ->
None:
+ """Test non-temporal column ignores time_grain even if specified"""
+ mock_is_temporal.return_value = False
+
+ config = XYChartConfig(
+ chart_type="xy",
+ x=ColumnRef(name="year"),
+ y=[ColumnRef(name="sales", aggregate="SUM")],
+ kind="bar",
+ time_grain="P1M", # This should be ignored for non-temporal
+ )
+
+ result = map_xy_config(config, dataset_id=123)
+
+ # time_grain_sqla should be None, not P1M
+ assert result["time_grain_sqla"] is None
+ assert result["x_axis_sort_series_type"] == "name"
diff --git
a/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
b/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
index 04a249cd9fa..af08834d57b 100644
--- a/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
+++ b/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
@@ -68,9 +68,30 @@ def mock_webdriver_baseurl(app_context):
def _mock_dataset(id: int = 1) -> Mock:
- """Create a mock dataset object."""
+ """Create a mock dataset object with columns and db_engine_spec."""
+ from superset.utils.core import ColumnSpec, GenericDataType
+
+ # Create mock column that appears temporal
+ mock_column = Mock()
+ mock_column.column_name = "date"
+ mock_column.type = "TIMESTAMP"
+
+ # Create mock db_engine_spec
+ mock_db_engine_spec = Mock()
+ mock_column_spec = ColumnSpec(
+ sqla_type=Mock(), generic_type=GenericDataType.TEMPORAL, is_dttm=True
+ )
+ mock_db_engine_spec.get_column_spec.return_value = mock_column_spec
+
+ # Create mock database
+ mock_database = Mock()
+ mock_database.db_engine_spec = mock_db_engine_spec
+
+ # Create dataset with all required attributes
dataset = Mock()
dataset.id = id
+ dataset.columns = [mock_column]
+ dataset.database = mock_database
return dataset