This is an automated email from the ASF dual-hosted git repository.

elizabeth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 4147d877fc1 fix(mcp): prevent DATE_TRUNC on non-temporal columns in 
chart generation (#37433)
4147d877fc1 is described below

commit 4147d877fc1b8f48e23f26685f112b91d4d261d1
Author: Amin Ghadersohi <[email protected]>
AuthorDate: Thu Feb 5 10:24:31 2026 -0700

    fix(mcp): prevent DATE_TRUNC on non-temporal columns in chart generation 
(#37433)
---
 superset/mcp_service/chart/chart_utils.py          | 157 ++++++++++--
 superset/mcp_service/chart/tool/generate_chart.py  |   5 +-
 superset/mcp_service/chart/tool/update_chart.py    |   4 +-
 .../mcp_service/chart/tool/update_chart_preview.py |   5 +-
 .../explore/tool/generate_explore_link.py          |   5 +-
 .../mcp_service/chart/test_chart_utils.py          | 281 ++++++++++++++++++++-
 .../explore/tool/test_generate_explore_link.py     |  23 +-
 7 files changed, 449 insertions(+), 31 deletions(-)

diff --git a/superset/mcp_service/chart/chart_utils.py 
b/superset/mcp_service/chart/chart_utils.py
index 4597ac55b50..748e770a4d8 100644
--- a/superset/mcp_service/chart/chart_utils.py
+++ b/superset/mcp_service/chart/chart_utils.py
@@ -22,6 +22,7 @@ This module contains shared logic for chart configuration 
mapping and explore li
 generation that can be used by both generate_chart and generate_explore_link 
tools.
 """
 
+import logging
 from typing import Any, Dict
 
 from superset.mcp_service.chart.schemas import (
@@ -34,24 +35,26 @@ from superset.mcp_service.chart.schemas import (
 from superset.mcp_service.utils.url_utils import get_superset_base_url
 from superset.utils import json
 
+logger = logging.getLogger(__name__)
+
 
 def generate_explore_link(dataset_id: int | str, form_data: Dict[str, Any]) -> 
str:
     """Generate an explore link for the given dataset and form data."""
+    from sqlalchemy.exc import SQLAlchemyError
+
+    from superset.commands.exceptions import CommandException
+    from superset.commands.explore.form_data.parameters import 
CommandParameters
+    from superset.daos.dataset import DatasetDAO
+    from superset.mcp_service.commands.create_form_data import (
+        MCPCreateFormDataCommand,
+    )
+    from superset.utils.core import DatasourceType
+
     base_url = get_superset_base_url()
     numeric_dataset_id = None
+    dataset = None
 
     try:
-        from superset.commands.explore.form_data.parameters import 
CommandParameters
-
-        # Find the dataset to get its numeric ID
-        from superset.daos.dataset import DatasetDAO
-        from superset.mcp_service.commands.create_form_data import (
-            MCPCreateFormDataCommand,
-        )
-        from superset.utils.core import DatasourceType
-
-        dataset = None
-
         if isinstance(dataset_id, int) or (
             isinstance(dataset_id, str) and dataset_id.isdigit()
         ):
@@ -92,27 +95,102 @@ def generate_explore_link(dataset_id: int | str, 
form_data: Dict[str, Any]) -> s
         # Return URL with just the form_data_key
         return f"{base_url}/explore/?form_data_key={form_data_key}"
 
-    except Exception:
+    except (CommandException, SQLAlchemyError, ValueError, AttributeError):
         # Fallback to basic explore URL with numeric ID if available
         if numeric_dataset_id is not None:
             return (
                 f"{base_url}/explore/?datasource_type=table"
                 f"&datasource_id={numeric_dataset_id}"
             )
+        return 
f"{base_url}/explore/?datasource_type=table&datasource_id={dataset_id}"
+
+
+def is_column_truly_temporal(column_name: str, dataset_id: int | str | None) 
-> bool:
+    """
+    Check if a column is truly temporal based on its SQL data type.
+
+    This is important because Superset may mark columns as is_dttm=True based 
on
+    column name heuristics (e.g., "year", "month"), but if the actual SQL type 
is
+    BIGINT or INTEGER, DATE_TRUNC will fail.
+
+    Uses the database engine spec's column type mapping to determine the actual
+    GenericDataType, bypassing the is_dttm flag which may be set incorrectly.
+
+    Args:
+        column_name: Name of the column to check
+        dataset_id: Dataset ID to look up column metadata
+
+    Returns:
+        True if the column has a real temporal SQL type, False otherwise
+    """
+    from superset.daos.dataset import DatasetDAO
+    from superset.utils.core import GenericDataType
+
+    if not dataset_id:
+        return True  # Default to temporal if we can't check (backward 
compatible)
+
+    try:
+        # Find dataset
+        if isinstance(dataset_id, int) or (
+            isinstance(dataset_id, str) and dataset_id.isdigit()
+        ):
+            dataset = DatasetDAO.find_by_id(int(dataset_id))
         else:
-            return (
-                
f"{base_url}/explore/?datasource_type=table&datasource_id={dataset_id}"
-            )
+            dataset = DatasetDAO.find_by_id(dataset_id, id_column="uuid")
+
+        if not dataset:
+            return True  # Default to temporal if dataset not found
+
+        # Find the column and check its actual type using db_engine_spec
+        column_lower = column_name.lower()
+        for col in dataset.columns:
+            if col.column_name.lower() == column_lower:
+                col_type = col.type
+                if not col_type:
+                    # No type info, trust is_dttm flag
+                    return getattr(col, "is_dttm", False)
+
+                # Use the db_engine_spec to get the actual GenericDataType
+                # This bypasses the is_dttm flag and checks the real SQL type
+                db_engine_spec = dataset.database.db_engine_spec
+                column_spec = db_engine_spec.get_column_spec(col_type)
+
+                if column_spec:
+                    is_temporal = column_spec.generic_type == 
GenericDataType.TEMPORAL
+                    if not is_temporal:
+                        logger.debug(
+                            "Column '%s' has type '%s' (generic: %s), "
+                            "treating as non-temporal",
+                            column_name,
+                            col_type,
+                            column_spec.generic_type,
+                        )
+                    return is_temporal
+
+                # If no column_spec, trust is_dttm flag
+                return getattr(col, "is_dttm", False)
+
+        return True  # Default if column not found
+
+    except (ValueError, AttributeError) as e:
+        logger.warning(
+            "Error checking column type for '%s' in dataset %s: %s",
+            column_name,
+            dataset_id,
+            e,
+        )
+        return True  # Default to temporal on error (backward compatible)
 
 
 def map_config_to_form_data(
     config: TableChartConfig | XYChartConfig,
+    dataset_id: int | str | None = None,
 ) -> Dict[str, Any]:
     """Map chart config to Superset form_data."""
     if isinstance(config, TableChartConfig):
         return map_table_config(config)
     elif isinstance(config, XYChartConfig):
-        return map_xy_config(config)
+        return map_xy_config(config, dataset_id=dataset_id)
     else:
         raise ValueError(f"Unsupported config type: {type(config)}")
 
@@ -259,13 +337,41 @@ def add_legend_config(form_data: Dict[str, Any], config: 
XYChartConfig) -> None:
             form_data["legend_orientation"] = config.legend.position
 
 
-def map_xy_config(config: XYChartConfig) -> Dict[str, Any]:
+def configure_temporal_handling(
+    form_data: Dict[str, Any],
+    x_is_temporal: bool,
+    time_grain: str | None,
+) -> None:
+    """Configure form_data based on whether x-axis column is temporal.
+
+    For temporal columns, enables standard time series handling.
+    For non-temporal columns (e.g., BIGINT year), disables DATE_TRUNC
+    by setting categorical sorting options.
+    """
+    if x_is_temporal:
+        if time_grain:
+            form_data["time_grain_sqla"] = time_grain
+    else:
+        # Non-temporal column - disable temporal handling to prevent DATE_TRUNC
+        form_data["x_axis_sort_series_type"] = "name"
+        form_data["x_axis_sort_series_ascending"] = True
+        form_data["time_grain_sqla"] = None
+        form_data["granularity_sqla"] = None
+
+
+def map_xy_config(
+    config: XYChartConfig, dataset_id: int | str | None = None
+) -> Dict[str, Any]:
     """Map XY chart config to form_data with defensive validation."""
     # Early validation to prevent empty charts
     if not config.y:
         raise ValueError("XY chart must have at least one Y-axis metric")
 
-    # Map chart kind to viz_type
+    # Check if x-axis column is truly temporal (based on actual SQL type)
+    x_is_temporal = is_column_truly_temporal(config.x.name, dataset_id)
+
+    # Map chart kind to viz_type - always use the same viz types
+    # The temporal vs non-temporal handling is done via form_data configuration
     viz_type_map = {
         "line": "echarts_timeseries_line",
         "bar": "echarts_timeseries_bar",
@@ -273,6 +379,14 @@ def map_xy_config(config: XYChartConfig) -> Dict[str, Any]:
         "scatter": "echarts_timeseries_scatter",
     }
 
+    if not x_is_temporal:
+        logger.info(
+            "X-axis column '%s' is not temporal (dataset_id=%s), "
+            "configuring as categorical dimension",
+            config.x.name,
+            dataset_id,
+        )
+
     # Convert Y columns to metrics with validation
     metrics = []
     for col in config.y:
@@ -286,13 +400,12 @@ def map_xy_config(config: XYChartConfig) -> Dict[str, 
Any]:
 
     form_data: Dict[str, Any] = {
         "viz_type": viz_type_map.get(config.kind, "echarts_timeseries_line"),
-        "x_axis": config.x.name,
         "metrics": metrics,
+        "x_axis": config.x.name,
     }
 
-    # Add time grain if specified (for temporal x-axis columns)
-    if config.time_grain:
-        form_data["time_grain_sqla"] = config.time_grain
+    # Configure temporal handling based on whether column is truly temporal
+    configure_temporal_handling(form_data, x_is_temporal, config.time_grain)
 
     # CRITICAL FIX: For time series charts, handle groupby carefully to avoid 
duplicates
     # The x_axis field already tells Superset which column to use for time 
grouping
diff --git a/superset/mcp_service/chart/tool/generate_chart.py 
b/superset/mcp_service/chart/tool/generate_chart.py
index bb8157adf4d..23eb23c01e5 100644
--- a/superset/mcp_service/chart/tool/generate_chart.py
+++ b/superset/mcp_service/chart/tool/generate_chart.py
@@ -174,7 +174,10 @@ async def generate_chart(  # noqa: C901
             )
 
         # Map the simplified config to Superset's form_data format
-        form_data = map_config_to_form_data(request.config)
+        # Pass dataset_id to enable column type checking for proper viz_type 
selection
+        form_data = map_config_to_form_data(
+            request.config, dataset_id=request.dataset_id
+        )
 
         chart = None
         chart_id = None
diff --git a/superset/mcp_service/chart/tool/update_chart.py 
b/superset/mcp_service/chart/tool/update_chart.py
index ed5355906ff..a1b720f380d 100644
--- a/superset/mcp_service/chart/tool/update_chart.py
+++ b/superset/mcp_service/chart/tool/update_chart.py
@@ -125,7 +125,9 @@ async def update_chart(
             )
 
         # Map the new config to form_data format
-        new_form_data = map_config_to_form_data(request.config)
+        # Get dataset_id from existing chart for column type checking
+        dataset_id = chart.datasource_id if chart.datasource_id else None
+        new_form_data = map_config_to_form_data(request.config, 
dataset_id=dataset_id)
 
         # Update chart using Superset's command
         from superset.commands.chart.update import UpdateChartCommand
diff --git a/superset/mcp_service/chart/tool/update_chart_preview.py 
b/superset/mcp_service/chart/tool/update_chart_preview.py
index 271709e230f..16893fcfbd0 100644
--- a/superset/mcp_service/chart/tool/update_chart_preview.py
+++ b/superset/mcp_service/chart/tool/update_chart_preview.py
@@ -66,7 +66,10 @@ def update_chart_preview(
 
     try:
         # Map the new config to form_data format
-        new_form_data = map_config_to_form_data(request.config)
+        # Pass dataset_id to enable column type checking for proper viz_type 
selection
+        new_form_data = map_config_to_form_data(
+            request.config, dataset_id=request.dataset_id
+        )
 
         # Generate new explore link with updated form_data
         explore_url = generate_explore_link(request.dataset_id, new_form_data)
diff --git a/superset/mcp_service/explore/tool/generate_explore_link.py 
b/superset/mcp_service/explore/tool/generate_explore_link.py
index a2deefbb272..d1f07daf929 100644
--- a/superset/mcp_service/explore/tool/generate_explore_link.py
+++ b/superset/mcp_service/explore/tool/generate_explore_link.py
@@ -92,7 +92,10 @@ async def generate_explore_link(
     try:
         await ctx.report_progress(1, 3, "Converting configuration to form 
data")
         # Map config to form_data using shared utilities
-        form_data = map_config_to_form_data(request.config)
+        # Pass dataset_id to enable column type checking for proper viz_type 
selection
+        form_data = map_config_to_form_data(
+            request.config, dataset_id=request.dataset_id
+        )
 
         # Add datasource to form_data for consistency with generate_chart
         # Only set if not already present to avoid overwriting
diff --git a/tests/unit_tests/mcp_service/chart/test_chart_utils.py 
b/tests/unit_tests/mcp_service/chart/test_chart_utils.py
index f2e9d966c89..eab027c3dca 100644
--- a/tests/unit_tests/mcp_service/chart/test_chart_utils.py
+++ b/tests/unit_tests/mcp_service/chart/test_chart_utils.py
@@ -17,14 +17,17 @@
 
 """Tests for chart utilities module"""
 
+from typing import Any
 from unittest.mock import patch
 
 import pytest
 
 from superset.mcp_service.chart.chart_utils import (
+    configure_temporal_handling,
     create_metric_object,
     generate_chart_name,
     generate_explore_link,
+    is_column_truly_temporal,
     map_config_to_form_data,
     map_filter_operator,
     map_table_config,
@@ -38,6 +41,7 @@ from superset.mcp_service.chart.schemas import (
     TableChartConfig,
     XYChartConfig,
 )
+from superset.utils.core import GenericDataType
 
 
 class TestCreateMetricObject:
@@ -423,7 +427,9 @@ class TestGenerateExploreLink:
         mock_get_base_url.return_value = "https://superset.example.com";
         form_data = {"viz_type": "table", "metrics": ["count"]}
 
-        result = generate_explore_link("123", form_data)
+        # Mock dataset not found to trigger fallback URL
+        with patch("superset.daos.dataset.DatasetDAO.find_by_id", 
return_value=None):
+            result = generate_explore_link("123", form_data)
 
         # Should use the configured base URL - use urlparse to avoid CodeQL 
warning
         parsed_url = urlparse(result)
@@ -471,13 +477,15 @@ class TestGenerateExploreLink:
 
     @patch("superset.mcp_service.chart.chart_utils.get_superset_base_url")
     def test_generate_explore_link_exception_handling(self, mock_get_base_url) 
-> None:
-        """Test generate_explore_link handles exceptions gracefully"""
+        """Test generate_explore_link handles SQLAlchemy exceptions 
gracefully"""
+        from sqlalchemy.exc import SQLAlchemyError
+
         mock_get_base_url.return_value = "http://localhost:9001";
 
-        # Mock exception during form_data creation
+        # Mock SQLAlchemy exception during dataset lookup
         with patch(
             "superset.daos.dataset.DatasetDAO.find_by_id",
-            side_effect=Exception("DB error"),
+            side_effect=SQLAlchemyError("DB error"),
         ):
             result = generate_explore_link("123", {"viz_type": "table"})
 
@@ -605,3 +613,268 @@ class TestCriticalBugFixes:
 
             form_data = map_xy_config(config)
             assert form_data["viz_type"] == expected_viz_type
+
+
+class TestIsColumnTrulyTemporal:
+    """Test is_column_truly_temporal function using db_engine_spec"""
+
+    def _create_mock_dataset(
+        self,
+        column_name: str,
+        column_type: str,
+        generic_type: GenericDataType,
+    ):
+        """Helper to create a mock dataset with proper db_engine_spec"""
+        from unittest.mock import MagicMock
+
+        from superset.utils.core import ColumnSpec
+
+        mock_column = MagicMock()
+        mock_column.column_name = column_name
+        mock_column.type = column_type
+
+        mock_db_engine_spec = MagicMock()
+        mock_column_spec = ColumnSpec(
+            sqla_type=MagicMock(), generic_type=generic_type, is_dttm=False
+        )
+        mock_db_engine_spec.get_column_spec.return_value = mock_column_spec
+
+        mock_database = MagicMock()
+        mock_database.db_engine_spec = mock_db_engine_spec
+
+        mock_dataset = MagicMock()
+        mock_dataset.columns = [mock_column]
+        mock_dataset.database = mock_database
+
+        return mock_dataset
+
+    def test_returns_true_when_no_dataset_id(self) -> None:
+        """Test returns True (default) when dataset_id is None"""
+        result = is_column_truly_temporal("year", None)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_true_when_dataset_not_found(self, mock_dao) -> None:
+        """Test returns True when dataset is not found"""
+        mock_dao.find_by_id.return_value = None
+        result = is_column_truly_temporal("year", 123)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_false_for_numeric_column(self, mock_dao) -> None:
+        """Test returns False for NUMERIC generic type (e.g., BIGINT)"""
+        mock_dataset = self._create_mock_dataset(
+            "year", "BIGINT", GenericDataType.NUMERIC
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("year", 123)
+        assert result is False
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_false_for_integer_column(self, mock_dao) -> None:
+        """Test returns False for INTEGER column (NUMERIC generic type)"""
+        mock_dataset = self._create_mock_dataset(
+            "month", "INTEGER", GenericDataType.NUMERIC
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("month", 123)
+        assert result is False
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_true_for_temporal_column(self, mock_dao) -> None:
+        """Test returns True for TEMPORAL generic type (e.g., TIMESTAMP)"""
+        mock_dataset = self._create_mock_dataset(
+            "created_at", "TIMESTAMP", GenericDataType.TEMPORAL
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("created_at", 123)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_true_for_date_column(self, mock_dao) -> None:
+        """Test returns True for DATE column (TEMPORAL generic type)"""
+        mock_dataset = self._create_mock_dataset(
+            "order_date", "DATE", GenericDataType.TEMPORAL
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("order_date", 123)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_case_insensitive_column_name_lookup(self, mock_dao) -> None:
+        """Test column name lookup is case insensitive"""
+        mock_dataset = self._create_mock_dataset(
+            "Year", "BIGINT", GenericDataType.NUMERIC
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("year", 123)
+        assert result is False
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_true_on_value_error(self, mock_dao) -> None:
+        """Test returns True (default) when ValueError occurs"""
+        mock_dao.find_by_id.side_effect = ValueError("Invalid ID")
+        result = is_column_truly_temporal("year", 123)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_returns_true_on_attribute_error(self, mock_dao) -> None:
+        """Test returns True (default) when AttributeError occurs"""
+        mock_dao.find_by_id.side_effect = AttributeError("Missing attribute")
+        result = is_column_truly_temporal("year", 123)
+        assert result is True
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_handles_uuid_dataset_id(self, mock_dao) -> None:
+        """Test handles UUID string as dataset_id"""
+        mock_dataset = self._create_mock_dataset(
+            "year", "BIGINT", GenericDataType.NUMERIC
+        )
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("year", "abc-123-uuid")
+        assert result is False
+        mock_dao.find_by_id.assert_called_with("abc-123-uuid", 
id_column="uuid")
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_falls_back_to_is_dttm_when_no_column_spec(self, mock_dao) -> None:
+        """Test falls back to is_dttm flag when get_column_spec returns None"""
+        from unittest.mock import MagicMock
+
+        mock_column = MagicMock()
+        mock_column.column_name = "year"
+        mock_column.type = "UNKNOWN_TYPE"
+        mock_column.is_dttm = False
+
+        mock_db_engine_spec = MagicMock()
+        mock_db_engine_spec.get_column_spec.return_value = None
+
+        mock_database = MagicMock()
+        mock_database.db_engine_spec = mock_db_engine_spec
+
+        mock_dataset = MagicMock()
+        mock_dataset.columns = [mock_column]
+        mock_dataset.database = mock_database
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("year", 123)
+        assert result is False
+
+    @patch("superset.daos.dataset.DatasetDAO")
+    def test_falls_back_to_is_dttm_when_no_type(self, mock_dao) -> None:
+        """Test falls back to is_dttm flag when column has no type"""
+        from unittest.mock import MagicMock
+
+        mock_column = MagicMock()
+        mock_column.column_name = "year"
+        mock_column.type = None
+        mock_column.is_dttm = True
+
+        mock_dataset = MagicMock()
+        mock_dataset.columns = [mock_column]
+        mock_dao.find_by_id.return_value = mock_dataset
+
+        result = is_column_truly_temporal("year", 123)
+        assert result is True
+
+
+class TestConfigureTemporalHandling:
+    """Test configure_temporal_handling function"""
+
+    def test_temporal_column_with_time_grain(self) -> None:
+        """Test temporal column sets time_grain_sqla"""
+        form_data: dict[str, Any] = {}
+        configure_temporal_handling(form_data, x_is_temporal=True, 
time_grain="P1M")
+        assert form_data["time_grain_sqla"] == "P1M"
+
+    def test_temporal_column_without_time_grain(self) -> None:
+        """Test temporal column without time_grain doesn't set 
time_grain_sqla"""
+        form_data: dict[str, Any] = {}
+        configure_temporal_handling(form_data, x_is_temporal=True, 
time_grain=None)
+        assert "time_grain_sqla" not in form_data
+
+    def test_non_temporal_column_sets_categorical_config(self) -> None:
+        """Test non-temporal column sets categorical configuration"""
+        form_data: dict[str, Any] = {}
+        configure_temporal_handling(form_data, x_is_temporal=False, 
time_grain=None)
+
+        assert form_data["x_axis_sort_series_type"] == "name"
+        assert form_data["x_axis_sort_series_ascending"] is True
+        assert form_data["time_grain_sqla"] is None
+        assert form_data["granularity_sqla"] is None
+
+    def test_non_temporal_column_ignores_time_grain(self) -> None:
+        """Test non-temporal column ignores time_grain parameter"""
+        form_data: dict[str, Any] = {}
+        configure_temporal_handling(form_data, x_is_temporal=False, 
time_grain="P1M")
+
+        # Should still set categorical config, not time_grain
+        assert form_data["time_grain_sqla"] is None
+        assert form_data["x_axis_sort_series_type"] == "name"
+
+
+class TestMapXYConfigWithNonTemporalColumn:
+    """Test map_xy_config with non-temporal columns (DATE_TRUNC fix)"""
+
+    @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+    def test_non_temporal_column_disables_time_grain(self, mock_is_temporal) 
-> None:
+        """Test non-temporal column sets categorical config"""
+        mock_is_temporal.return_value = False
+
+        config = XYChartConfig(
+            chart_type="xy",
+            x=ColumnRef(name="year"),
+            y=[ColumnRef(name="sales", aggregate="SUM")],
+            kind="bar",
+        )
+
+        result = map_xy_config(config, dataset_id=123)
+
+        assert result["x_axis"] == "year"
+        assert result["x_axis_sort_series_type"] == "name"
+        assert result["x_axis_sort_series_ascending"] is True
+        assert result["time_grain_sqla"] is None
+        assert result["granularity_sqla"] is None
+
+    @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+    def test_temporal_column_allows_time_grain(self, mock_is_temporal) -> None:
+        """Test temporal column allows time_grain to be set"""
+        mock_is_temporal.return_value = True
+
+        config = XYChartConfig(
+            chart_type="xy",
+            x=ColumnRef(name="created_at"),
+            y=[ColumnRef(name="count", aggregate="COUNT")],
+            kind="line",
+            time_grain="P1W",
+        )
+
+        result = map_xy_config(config, dataset_id=123)
+
+        assert result["x_axis"] == "created_at"
+        assert result["time_grain_sqla"] == "P1W"
+        assert "x_axis_sort_series_type" not in result
+
+    @patch("superset.mcp_service.chart.chart_utils.is_column_truly_temporal")
+    def test_non_temporal_ignores_time_grain_param(self, mock_is_temporal) -> 
None:
+        """Test non-temporal column ignores time_grain even if specified"""
+        mock_is_temporal.return_value = False
+
+        config = XYChartConfig(
+            chart_type="xy",
+            x=ColumnRef(name="year"),
+            y=[ColumnRef(name="sales", aggregate="SUM")],
+            kind="bar",
+            time_grain="P1M",  # This should be ignored for non-temporal
+        )
+
+        result = map_xy_config(config, dataset_id=123)
+
+        # time_grain_sqla should be None, not P1M
+        assert result["time_grain_sqla"] is None
+        assert result["x_axis_sort_series_type"] == "name"
diff --git 
a/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py 
b/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
index 04a249cd9fa..af08834d57b 100644
--- a/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
+++ b/tests/unit_tests/mcp_service/explore/tool/test_generate_explore_link.py
@@ -68,9 +68,30 @@ def mock_webdriver_baseurl(app_context):
 
 
 def _mock_dataset(id: int = 1) -> Mock:
-    """Create a mock dataset object."""
+    """Create a mock dataset object with columns and db_engine_spec."""
+    from superset.utils.core import ColumnSpec, GenericDataType
+
+    # Create mock column that appears temporal
+    mock_column = Mock()
+    mock_column.column_name = "date"
+    mock_column.type = "TIMESTAMP"
+
+    # Create mock db_engine_spec
+    mock_db_engine_spec = Mock()
+    mock_column_spec = ColumnSpec(
+        sqla_type=Mock(), generic_type=GenericDataType.TEMPORAL, is_dttm=True
+    )
+    mock_db_engine_spec.get_column_spec.return_value = mock_column_spec
+
+    # Create mock database
+    mock_database = Mock()
+    mock_database.db_engine_spec = mock_db_engine_spec
+
+    # Create dataset with all required attributes
     dataset = Mock()
     dataset.id = id
+    dataset.columns = [mock_column]
+    dataset.database = mock_database
     return dataset
 
 

Reply via email to