This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch semantic-layer-feature
in repository https://gitbox.apache.org/repos/asf/superset.git

commit e418958e40acc208b28d79afddd5d7b0afc93363
Author: Beto Dealmeida <[email protected]>
AuthorDate: Fri Feb 6 11:16:09 2026 -0500

    Add mapper
---
 .../superset_core/semantic_layers/semantic_view.py |    2 +-
 superset/semantic_layers/__init__.py               |   16 +
 superset/semantic_layers/mapper.py                 |  939 ++++++++++++
 tests/unit_tests/semantic_layers/mapper_test.py    | 1586 ++++++++++++++++++++
 4 files changed, 2542 insertions(+), 1 deletion(-)

diff --git a/superset-core/src/superset_core/semantic_layers/semantic_view.py 
b/superset-core/src/superset_core/semantic_layers/semantic_view.py
index 11f041132e3..5ea8fc5b42d 100644
--- a/superset-core/src/superset_core/semantic_layers/semantic_view.py
+++ b/superset-core/src/superset_core/semantic_layers/semantic_view.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 import enum
 from typing import Protocol, runtime_checkable
 
-from superset_core.semant_views.types import (
+from superset_core.semantic_layers.types import (
     AdhocFilter,
     Dimension,
     Filter,
diff --git a/superset/semantic_layers/__init__.py 
b/superset/semantic_layers/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/superset/semantic_layers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/superset/semantic_layers/mapper.py 
b/superset/semantic_layers/mapper.py
new file mode 100644
index 00000000000..38bb96cb396
--- /dev/null
+++ b/superset/semantic_layers/mapper.py
@@ -0,0 +1,939 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Functions for mapping `QueryObject` to semantic layers.
+
+These functions validate and convert a `QueryObject` into one or more 
`SemanticQuery`,
+which are then passed to semantic layer implementations for execution, 
returning a
+single dataframe.
+
+"""
+
+from datetime import datetime, timedelta
+from time import time
+from typing import Any, cast, Sequence, TypeGuard
+
+import numpy as np
+from superset_core.semantic_layers.semantic_view import SemanticViewFeature
+from superset_core.semantic_layers.types import (
+    AdhocExpression,
+    AdhocFilter,
+    Day,
+    Dimension,
+    Filter,
+    FilterValues,
+    Grain,
+    GroupLimit,
+    Hour,
+    Metric,
+    Minute,
+    Month,
+    Operator,
+    OrderDirection,
+    OrderTuple,
+    PredicateType,
+    Quarter,
+    Second,
+    SemanticQuery,
+    SemanticResult,
+    Week,
+    Year,
+)
+
+from superset.common.db_query_status import QueryStatus
+from superset.common.query_object import QueryObject
+from superset.common.utils.time_range_utils import 
get_since_until_from_query_object
+from superset.connectors.sqla.models import BaseDatasource
+from superset.models.helpers import QueryResult
+from superset.superset_typing import AdhocColumn
+from superset.utils.core import (
+    FilterOperator,
+    QueryObjectFilterClause,
+    TIME_COMPARISON,
+)
+from superset.utils.date_parser import get_past_or_future
+
+
+class ValidatedQueryObjectFilterClause(QueryObjectFilterClause):
+    """
+    A validated QueryObject filter clause with a string column name.
+
+    The `col` in a `QueryObjectFilterClause` can be either a string (column 
name) or an
+    adhoc column, but we only support the former in semantic layers.
+    """
+
+    # overwrite to narrow type; mypy complains about more restrictive typed 
dicts,
+    # but the alternative would be to redefine the object
+    col: str  # type: ignore[misc]
+    op: str  # type: ignore[misc]
+
+
+class ValidatedQueryObject(QueryObject):
+    """
+    A query object that has a datasource defined.
+    """
+
+    datasource: BaseDatasource
+
+    # overwrite to narrow type; mypy complains about the assignment since the 
base type
+    # allows adhoc filters, but we only support validated filters here
+    filter: list[ValidatedQueryObjectFilterClause]  # type: ignore[assignment]
+    series_columns: Sequence[str]  # type: ignore[assignment]
+    series_limit_metric: str | None
+
+
+def get_results(query_object: QueryObject) -> QueryResult:
+    """
+    Run 1+ queries based on `QueryObject` and return the results.
+
+    :param query_object: The QueryObject containing query specifications
+    :return: QueryResult compatible with Superset's query interface
+    """
+    if not validate_query_object(query_object):
+        raise ValueError("QueryObject must have a datasource defined.")
+
+    # Track execution time
+    start_time = time()
+
+    semantic_view = query_object.datasource.implementation
+    dispatcher = (
+        semantic_view.get_row_count
+        if query_object.is_rowcount
+        else semantic_view.get_dataframe
+    )
+
+    # Step 1: Convert QueryObject to list of SemanticQuery objects
+    # The first query is the main query, subsequent queries are for time 
offsets
+    queries = map_query_object(query_object)
+
+    # Step 2: Execute the main query (first in the list)
+    main_query = queries[0]
+    main_result = dispatcher(
+        metrics=main_query.metrics,
+        dimensions=main_query.dimensions,
+        filters=main_query.filters,
+        order=main_query.order,
+        limit=main_query.limit,
+        offset=main_query.offset,
+        group_limit=main_query.group_limit,
+    )
+
+    main_df = main_result.results
+
+    # Collect all requests (SQL queries, HTTP requests, etc.) for 
troubleshooting
+    all_requests = list(main_result.requests)
+
+    # If no time offsets, return the main result as-is
+    if not query_object.time_offsets or len(queries) <= 1:
+        semantic_result = SemanticResult(
+            requests=all_requests,
+            results=main_df,
+        )
+        duration = timedelta(seconds=time() - start_time)
+        return map_semantic_result_to_query_result(
+            semantic_result,
+            query_object,
+            duration,
+        )
+
+    # Get metric names from the main query
+    # These are the columns that will be renamed with offset suffixes
+    metric_names = [metric.name for metric in main_query.metrics]
+
+    # Join keys are all columns except metrics
+    # These will be used to match rows between main and offset DataFrames
+    join_keys = [col for col in main_df.columns if col not in metric_names]
+
+    # Step 3 & 4: Execute each time offset query and join results
+    for offset_query, time_offset in zip(
+        queries[1:],
+        query_object.time_offsets,
+        strict=False,
+    ):
+        # Execute the offset query
+        result = dispatcher(
+            metrics=offset_query.metrics,
+            dimensions=offset_query.dimensions,
+            filters=offset_query.filters,
+            order=offset_query.order,
+            limit=offset_query.limit,
+            offset=offset_query.offset,
+            group_limit=offset_query.group_limit,
+        )
+
+        # Add this query's requests to the collection
+        all_requests.extend(result.requests)
+
+        offset_df = result.results
+
+        # Handle empty results - add NaN columns directly instead of merging
+        # This avoids dtype mismatch issues with empty DataFrames
+        if offset_df.empty:
+            # Add offset metric columns with NaN values directly to main_df
+            for metric in metric_names:
+                offset_col_name = TIME_COMPARISON.join([metric, time_offset])
+                main_df[offset_col_name] = np.nan
+        else:
+            # Rename metric columns with time offset suffix
+            # Format: "{metric_name}__{time_offset}"
+            # Example: "revenue" -> "revenue__1 week ago"
+            offset_df = offset_df.rename(
+                columns={
+                    metric: TIME_COMPARISON.join([metric, time_offset])
+                    for metric in metric_names
+                }
+            )
+
+            # Step 5: Perform left join on dimension columns
+            # This preserves all rows from main_df and adds offset metrics
+            # where they match
+            main_df = main_df.merge(
+                offset_df,
+                on=join_keys,
+                how="left",
+                suffixes=("", "__duplicate"),
+            )
+
+            # Clean up any duplicate columns that might have been created
+            # (shouldn't happen with proper join keys, but defensive 
programming)
+            duplicate_cols = [
+                col for col in main_df.columns if col.endswith("__duplicate")
+            ]
+            if duplicate_cols:
+                main_df = main_df.drop(columns=duplicate_cols)
+
+    # Convert final result to QueryResult
+    semantic_result = SemanticResult(requests=all_requests, results=main_df)
+    duration = timedelta(seconds=time() - start_time)
+    return map_semantic_result_to_query_result(
+        semantic_result,
+        query_object,
+        duration,
+    )
+
+
+def map_semantic_result_to_query_result(
+    semantic_result: SemanticResult,
+    query_object: ValidatedQueryObject,
+    duration: timedelta,
+) -> QueryResult:
+    """
+    Convert a SemanticResult to a QueryResult.
+
+    :param semantic_result: Result from the semantic layer
+    :param query_object: Original QueryObject (for passthrough attributes)
+    :param duration: Time taken to execute the query
+    :return: QueryResult compatible with Superset's query interface
+    """
+    # Get the query string from requests (typically one or more SQL queries)
+    query_str = ""
+    if semantic_result.requests:
+        # Join all requests for display (could be multiple for time 
comparisons)
+        query_str = "\n\n".join(
+            f"-- {req.type}\n{req.definition}" for req in 
semantic_result.requests
+        )
+
+    return QueryResult(
+        # Core data
+        df=semantic_result.results,
+        query=query_str,
+        duration=duration,
+        # Template filters - not applicable to semantic layers
+        # (semantic layers don't use Jinja templates)
+        applied_template_filters=None,
+        # Filter columns - not applicable to semantic layers
+        # (semantic layers handle filter validation internally)
+        applied_filter_columns=None,
+        rejected_filter_columns=None,
+        # Status - always success if we got here
+        # (errors would raise exceptions before reaching this point)
+        status=QueryStatus.SUCCESS,
+        error_message=None,
+        errors=None,
+        # Time range - pass through from original query_object
+        from_dttm=query_object.from_dttm,
+        to_dttm=query_object.to_dttm,
+    )
+
+
+def _normalize_column(column: str | AdhocColumn, dimension_names: set[str]) -> 
str:
+    """
+    Normalize a column to its dimension name.
+
+    Columns can be either:
+    - A string (dimension name directly)
+    - An AdhocColumn with isColumnReference=True and sqlExpression containing 
the
+      dimension name
+    """
+    if isinstance(column, str):
+        return column
+
+    # Handle column references (e.g., from time-series charts)
+    if column.get("isColumnReference") and (sql_expr := 
column.get("sqlExpression")):
+        if sql_expr in dimension_names:
+            return sql_expr
+
+    raise ValueError("Adhoc dimensions are not supported in Semantic Views.")
+
+
+def map_query_object(query_object: ValidatedQueryObject) -> 
list[SemanticQuery]:
+    """
+    Convert a `QueryObject` into a list of `SemanticQuery`.
+
+    This function maps the `QueryObject` into query objects that focus less on
+    visualization and more on semantics.
+    """
+    semantic_view = query_object.datasource.implementation
+
+    all_metrics = {metric.name: metric for metric in semantic_view.metrics}
+    all_dimensions = {
+        dimension.name: dimension for dimension in semantic_view.dimensions
+    }
+
+    # Normalize columns (may be dicts with isColumnReference=True for 
time-series)
+    dimension_names = set(all_dimensions.keys())
+    normalized_columns = {
+        _normalize_column(column, dimension_names) for column in 
query_object.columns
+    }
+
+    metrics = [all_metrics[metric] for metric in (query_object.metrics or [])]
+
+    grain = (
+        _convert_time_grain(query_object.extras["time_grain_sqla"])
+        if "time_grain_sqla" in query_object.extras
+        else None
+    )
+    dimensions = [
+        dimension
+        for dimension in semantic_view.dimensions
+        if dimension.name in normalized_columns
+        and (
+            # if a grain is specified, only include the time dimension if its 
grain
+            # matches the requested grain
+            grain is None
+            or dimension.name != query_object.granularity
+            or dimension.grain == grain
+        )
+    ]
+
+    order = _get_order_from_query_object(query_object, all_metrics, 
all_dimensions)
+    limit = query_object.row_limit
+    offset = query_object.row_offset
+
+    group_limit = _get_group_limit_from_query_object(
+        query_object,
+        all_metrics,
+        all_dimensions,
+    )
+
+    queries = []
+    for time_offset in [None] + query_object.time_offsets:
+        filters = _get_filters_from_query_object(
+            query_object,
+            time_offset,
+            all_dimensions,
+        )
+        print(">>", filters)
+
+        queries.append(
+            SemanticQuery(
+                metrics=metrics,
+                dimensions=dimensions,
+                filters=filters,
+                order=order,
+                limit=limit,
+                offset=offset,
+                group_limit=group_limit,
+            )
+        )
+
+    return queries
+
+
+def _get_filters_from_query_object(
+    query_object: ValidatedQueryObject,
+    time_offset: str | None,
+    all_dimensions: dict[str, Dimension],
+) -> set[Filter | AdhocFilter]:
+    """
+    Extract all filters from the query object, including time range filters.
+
+    This simplifies the complexity of 
from_dttm/to_dttm/inner_from_dttm/inner_to_dttm
+    by converting all time constraints into filters.
+    """
+    filters: set[Filter | AdhocFilter] = set()
+
+    # 1. Add fetch values predicate if present
+    if (
+        query_object.apply_fetch_values_predicate
+        and query_object.datasource.fetch_values_predicate
+    ):
+        filters.add(
+            AdhocFilter(
+                type=PredicateType.WHERE,
+                definition=query_object.datasource.fetch_values_predicate,
+            )
+        )
+
+    # 2. Add time range filter based on from_dttm/to_dttm
+    # For time offsets, this automatically calculates the shifted bounds
+    time_filters = _get_time_filter(query_object, time_offset, all_dimensions)
+    filters.update(time_filters)
+
+    # 3. Add filters from query_object.extras (WHERE and HAVING clauses)
+    extras_filters = _get_filters_from_extras(query_object.extras)
+    filters.update(extras_filters)
+
+    # 4. Add all other filters from query_object.filter
+    for filter_ in query_object.filter:
+        # Skip temporal range filters - we're using inner bounds instead
+        if (
+            filter_.get("op") == FilterOperator.TEMPORAL_RANGE.value
+            and query_object.granularity
+        ):
+            continue
+
+        if converted_filters := _convert_query_object_filter(filter_, 
all_dimensions):
+            filters.update(converted_filters)
+
+    return filters
+
+
+def _get_filters_from_extras(extras: dict[str, Any]) -> set[AdhocFilter]:
+    """
+    Extract filters from the extras dict.
+
+    The extras dict can contain various keys that affect query behavior:
+
+    Supported keys (converted to filters):
+    - "where": SQL WHERE clause expression (e.g., "customer_id > 100")
+    - "having": SQL HAVING clause expression (e.g., "SUM(sales) > 1000")
+
+    Other keys in extras (handled elsewhere in the mapper):
+    - "time_grain_sqla": Time granularity (e.g., "P1D", "PT1H")
+      Handled in _convert_time_grain() and used for dimension grain matching
+
+    Note: The WHERE and HAVING clauses from extras are SQL expressions that
+    are passed through as-is to the semantic layer as AdhocFilter objects.
+    """
+    filters: set[AdhocFilter] = set()
+
+    # Add WHERE clause from extras
+    if where_clause := extras.get("where"):
+        filters.add(
+            AdhocFilter(
+                type=PredicateType.WHERE,
+                definition=where_clause,
+            )
+        )
+
+    # Add HAVING clause from extras
+    if having_clause := extras.get("having"):
+        filters.add(
+            AdhocFilter(
+                type=PredicateType.HAVING,
+                definition=having_clause,
+            )
+        )
+
+    return filters
+
+
+def _get_time_filter(
+    query_object: ValidatedQueryObject,
+    time_offset: str | None,
+    all_dimensions: dict[str, Dimension],
+) -> set[Filter]:
+    """
+    Create a time range filter from the query object.
+
+    This handles both regular queries and time offset queries, simplifying the
+    complexity of from_dttm/to_dttm/inner_from_dttm/inner_to_dttm by using the
+    same time bounds for both the main query and series limit subqueries.
+    """
+    filters: set[Filter] = set()
+
+    if not query_object.granularity:
+        return filters
+
+    time_dimension = all_dimensions.get(query_object.granularity)
+    if not time_dimension:
+        return filters
+
+    # Get the appropriate time bounds based on whether this is a time offset 
query
+    from_dttm, to_dttm = _get_time_bounds(query_object, time_offset)
+
+    if not from_dttm or not to_dttm:
+        return filters
+
+    # Create a filter with >= and < operators
+    return {
+        Filter(
+            type=PredicateType.WHERE,
+            column=time_dimension,
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=from_dttm,
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=time_dimension,
+            operator=Operator.LESS_THAN,
+            value=to_dttm,
+        ),
+    }
+
+
+def _get_time_bounds(
+    query_object: ValidatedQueryObject,
+    time_offset: str | None,
+) -> tuple[datetime | None, datetime | None]:
+    """
+    Get the appropriate time bounds for the query.
+
+    For regular queries (time_offset is None), returns from_dttm/to_dttm.
+    For time offset queries, calculates the shifted bounds.
+
+    This simplifies the inner_from_dttm/inner_to_dttm complexity by using
+    the same bounds for both main queries and series limit subqueries (Option 
1).
+    """
+    if time_offset is None:
+        # Main query: use from_dttm/to_dttm directly
+        return query_object.from_dttm, query_object.to_dttm
+
+    # Time offset query: calculate shifted bounds
+    # Use from_dttm/to_dttm if available, otherwise try to get from time_range
+    outer_from = query_object.from_dttm
+    outer_to = query_object.to_dttm
+
+    if not outer_from or not outer_to:
+        # Fall back to parsing time_range if from_dttm/to_dttm not set
+        outer_from, outer_to = get_since_until_from_query_object(query_object)
+
+    if not outer_from or not outer_to:
+        return None, None
+
+    # Apply the offset to both bounds
+    offset_from = get_past_or_future(time_offset, outer_from)
+    offset_to = get_past_or_future(time_offset, outer_to)
+
+    return offset_from, offset_to
+
+
+def _convert_query_object_filter(
+    filter_: ValidatedQueryObjectFilterClause,
+    all_dimensions: dict[str, Dimension],
+) -> set[Filter] | None:
+    """
+    Convert a QueryObject filter dict to a semantic layer Filter or 
AdhocFilter.
+    """
+    operator_str = filter_["op"]
+
+    # Handle simple column filters
+    col = filter_.get("col")
+    if col not in all_dimensions:
+        return None
+
+    dimension = all_dimensions[col]
+
+    val_str = filter_["val"]
+    value: FilterValues | frozenset[FilterValues]
+    if val_str is None:
+        value = None
+    elif isinstance(val_str, (list, tuple)):
+        value = frozenset(val_str)
+    else:
+        value = val_str
+
+    # Special case for temporal range
+    if operator_str == FilterOperator.TEMPORAL_RANGE.value:
+        if not isinstance(value, str):
+            return None
+        start, end = value.split(" : ")
+        return {
+            Filter(
+                type=PredicateType.WHERE,
+                column=dimension,
+                operator=Operator.GREATER_THAN_OR_EQUAL,
+                value=start,
+            ),
+            Filter(
+                type=PredicateType.WHERE,
+                column=dimension,
+                operator=Operator.LESS_THAN,
+                value=end,
+            ),
+        }
+
+    # Map QueryObject operators to semantic layer operators
+    operator_mapping = {
+        FilterOperator.EQUALS.value: Operator.EQUALS,
+        FilterOperator.NOT_EQUALS.value: Operator.NOT_EQUALS,
+        FilterOperator.GREATER_THAN.value: Operator.GREATER_THAN,
+        FilterOperator.LESS_THAN.value: Operator.LESS_THAN,
+        FilterOperator.GREATER_THAN_OR_EQUALS.value: 
Operator.GREATER_THAN_OR_EQUAL,
+        FilterOperator.LESS_THAN_OR_EQUALS.value: Operator.LESS_THAN_OR_EQUAL,
+        FilterOperator.IN.value: Operator.IN,
+        FilterOperator.NOT_IN.value: Operator.NOT_IN,
+        FilterOperator.LIKE.value: Operator.LIKE,
+        FilterOperator.NOT_LIKE.value: Operator.NOT_LIKE,
+        FilterOperator.IS_NULL.value: Operator.IS_NULL,
+        FilterOperator.IS_NOT_NULL.value: Operator.IS_NOT_NULL,
+    }
+
+    operator = operator_mapping.get(operator_str)
+    if not operator:
+        # Unknown operator - create adhoc filter
+        return None
+
+    return {
+        Filter(
+            type=PredicateType.WHERE,
+            column=dimension,
+            operator=operator,
+            value=value,
+        )
+    }
+
+
+def _get_order_from_query_object(
+    query_object: ValidatedQueryObject,
+    all_metrics: dict[str, Metric],
+    all_dimensions: dict[str, Dimension],
+) -> list[OrderTuple]:
+    order: list[OrderTuple] = []
+    for element, ascending in query_object.orderby:
+        direction = OrderDirection.ASC if ascending else OrderDirection.DESC
+
+        # adhoc
+        if isinstance(element, dict):
+            if element["sqlExpression"] is not None:
+                order.append(
+                    (
+                        AdhocExpression(
+                            id=element["label"] or element["sqlExpression"],
+                            definition=element["sqlExpression"],
+                        ),
+                        direction,
+                    )
+                )
+        elif element in all_dimensions:
+            order.append((all_dimensions[element], direction))
+        elif element in all_metrics:
+            order.append((all_metrics[element], direction))
+
+    return order
+
+
+def _get_group_limit_from_query_object(
+    query_object: ValidatedQueryObject,
+    all_metrics: dict[str, Metric],
+    all_dimensions: dict[str, Dimension],
+) -> GroupLimit | None:
+    # no limit
+    if query_object.series_limit == 0 or not query_object.columns:
+        return None
+
+    dimensions = [all_dimensions[dim_id] for dim_id in 
query_object.series_columns]
+    top = query_object.series_limit
+    metric = (
+        all_metrics[query_object.series_limit_metric]
+        if query_object.series_limit_metric
+        else None
+    )
+    direction = OrderDirection.DESC if query_object.order_desc else 
OrderDirection.ASC
+    group_others = query_object.group_others_when_limit_reached
+
+    # Check if we need separate filters for the group limit subquery
+    # This happens when inner_from_dttm/inner_to_dttm differ from 
from_dttm/to_dttm
+    group_limit_filters = _get_group_limit_filters(query_object, 
all_dimensions)
+
+    return GroupLimit(
+        dimensions=dimensions,
+        top=top,
+        metric=metric,
+        direction=direction,
+        group_others=group_others,
+        filters=group_limit_filters,
+    )
+
+
+def _get_group_limit_filters(
+    query_object: ValidatedQueryObject,
+    all_dimensions: dict[str, Dimension],
+) -> set[Filter | AdhocFilter] | None:
+    """
+    Get separate filters for the group limit subquery if needed.
+
+    This is used when inner_from_dttm/inner_to_dttm differ from 
from_dttm/to_dttm,
+    which happens during time comparison queries. The group limit subquery may 
need
+    different time bounds to determine the top N groups.
+
+    Returns None if the group limit should use the same filters as the main 
query.
+    """
+    # Check if inner time bounds are explicitly set and differ from outer 
bounds
+    if (
+        query_object.inner_from_dttm is None
+        or query_object.inner_to_dttm is None
+        or (
+            query_object.inner_from_dttm == query_object.from_dttm
+            and query_object.inner_to_dttm == query_object.to_dttm
+        )
+    ):
+        # No separate bounds needed - use the same filters as the main query
+        return None
+
+    # Create separate filters for the group limit subquery
+    filters: set[Filter | AdhocFilter] = set()
+
+    # Add time range filter using inner bounds
+    if query_object.granularity:
+        time_dimension = all_dimensions.get(query_object.granularity)
+        if (
+            time_dimension
+            and query_object.inner_from_dttm
+            and query_object.inner_to_dttm
+        ):
+            filters.update(
+                {
+                    Filter(
+                        type=PredicateType.WHERE,
+                        column=time_dimension,
+                        operator=Operator.GREATER_THAN_OR_EQUAL,
+                        value=query_object.inner_from_dttm,
+                    ),
+                    Filter(
+                        type=PredicateType.WHERE,
+                        column=time_dimension,
+                        operator=Operator.LESS_THAN,
+                        value=query_object.inner_to_dttm,
+                    ),
+                }
+            )
+
+    # Add fetch values predicate if present
+    if (
+        query_object.apply_fetch_values_predicate
+        and query_object.datasource.fetch_values_predicate
+    ):
+        filters.add(
+            AdhocFilter(
+                type=PredicateType.WHERE,
+                definition=query_object.datasource.fetch_values_predicate,
+            )
+        )
+
+    # Add filters from query_object.extras (WHERE and HAVING clauses)
+    extras_filters = _get_filters_from_extras(query_object.extras)
+    filters.update(extras_filters)
+
+    # Add all other non-temporal filters from query_object.filter
+    for filter_ in query_object.filter:
+        # Skip temporal range filters - we're using inner bounds instead
+        if (
+            filter_.get("op") == FilterOperator.TEMPORAL_RANGE.value
+            and query_object.granularity
+        ):
+            continue
+
+        if converted_filters := _convert_query_object_filter(filter_, 
all_dimensions):
+            filters.update(converted_filters)
+
+    return filters if filters else None
+
+
+def _convert_time_grain(time_grain: str) -> Grain | None:
+    """
+    Convert a time grain string from the query object to a Grain enum.
+    """
+    mapping = {
+        grain.representation: grain
+        for grain in [
+            Second,
+            Minute,
+            Hour,
+            Day,
+            Week,
+            Month,
+            Quarter,
+            Year,
+        ]
+    }
+
+    return mapping.get(time_grain)
+
+
+def validate_query_object(
+    query_object: QueryObject,
+) -> TypeGuard[ValidatedQueryObject]:
+    """
+    Validate that the `QueryObject` is compatible with the `SemanticView`.
+
+    If some semantic view implementation supports these features we should add 
an
+    attribute to the `SemanticViewImplementation` to indicate support for them.
+    """
+    if not query_object.datasource:
+        return False
+
+    query_object = cast(ValidatedQueryObject, query_object)
+
+    _validate_metrics(query_object)
+    _validate_dimensions(query_object)
+    _validate_filters(query_object)
+    _validate_granularity(query_object)
+    _validate_group_limit(query_object)
+    _validate_orderby(query_object)
+
+    return True
+
+
+def _validate_metrics(query_object: ValidatedQueryObject) -> None:
+    """
+    Make sure metrics are defined in the semantic view.
+    """
+    semantic_view = query_object.datasource.implementation
+
+    if any(not isinstance(metric, str) for metric in (query_object.metrics or 
[])):
+        raise ValueError("Adhoc metrics are not supported in Semantic Views.")
+
+    metric_names = {metric.name for metric in semantic_view.metrics}
+    if not set(query_object.metrics or []) <= metric_names:
+        raise ValueError("All metrics must be defined in the Semantic View.")
+
+
+def _validate_dimensions(query_object: ValidatedQueryObject) -> None:
+    """
+    Make sure all dimensions are defined in the semantic view.
+    """
+    semantic_view = query_object.datasource.implementation
+    dimension_names = {dimension.name for dimension in 
semantic_view.dimensions}
+
+    # Normalize all columns to dimension names
+    normalized_columns = [
+        _normalize_column(column, dimension_names) for column in 
query_object.columns
+    ]
+
+    if not set(normalized_columns) <= dimension_names:
+        raise ValueError("All dimensions must be defined in the Semantic 
View.")
+
+
+def _validate_filters(query_object: ValidatedQueryObject) -> None:
+    """
+    Make sure all filters are valid.
+    """
+    for filter_ in query_object.filter:
+        if isinstance(filter_["col"], dict):
+            raise ValueError(
+                "Adhoc columns are not supported in Semantic View filters."
+            )
+        if not filter_.get("op"):
+            raise ValueError("All filters must have an operator defined.")
+
+
+def _validate_granularity(query_object: ValidatedQueryObject) -> None:
+    """
+    Make sure time column and time grain are valid.
+    """
+    semantic_view = query_object.datasource.implementation
+    dimension_names = {dimension.name for dimension in 
semantic_view.dimensions}
+
+    if time_column := query_object.granularity:
+        if time_column not in dimension_names:
+            raise ValueError(
+                "The time column must be defined in the Semantic View 
dimensions."
+            )
+
+    if time_grain := query_object.extras.get("time_grain_sqla"):
+        if not time_column:
+            raise ValueError(
+                "A time column must be specified when a time grain is 
provided."
+            )
+
+        supported_time_grains = {
+            dimension.grain
+            for dimension in semantic_view.dimensions
+            if dimension.name == time_column and dimension.grain
+        }
+        if _convert_time_grain(time_grain) not in supported_time_grains:
+            raise ValueError(
+                "The time grain is not supported for the time column in the "
+                "Semantic View."
+            )
+
+
+def _validate_group_limit(query_object: ValidatedQueryObject) -> None:
+    """
+    Validate group limit related features in the query object.
+    """
+    semantic_view = query_object.datasource.implementation
+
+    # no limit
+    if query_object.series_limit == 0:
+        return
+
+    if (
+        query_object.series_columns
+        and SemanticViewFeature.GROUP_LIMIT not in semantic_view.features
+    ):
+        raise ValueError("Group limit is not supported in this Semantic View.")
+
+    if any(not isinstance(col, str) for col in query_object.series_columns):
+        raise ValueError("Adhoc dimensions are not supported in series 
columns.")
+
+    metric_names = {metric.name for metric in semantic_view.metrics}
+    if query_object.series_limit_metric and (
+        not isinstance(query_object.series_limit_metric, str)
+        or query_object.series_limit_metric not in metric_names
+    ):
+        raise ValueError(
+            "The series limit metric must be defined in the Semantic View."
+        )
+
+    dimension_names = {dimension.name for dimension in 
semantic_view.dimensions}
+    if not set(query_object.series_columns) <= dimension_names:
+        raise ValueError("All series columns must be defined in the Semantic 
View.")
+
+    if (
+        query_object.group_others_when_limit_reached
+        and SemanticViewFeature.GROUP_OTHERS not in semantic_view.features
+    ):
+        raise ValueError(
+            "Grouping others when limit is reached is not supported in this 
Semantic "
+            "View."
+        )
+
+
+def _validate_orderby(query_object: ValidatedQueryObject) -> None:
+    """
+    Validate order by elements in the query object.
+    """
+    semantic_view = query_object.datasource.implementation
+
+    if (
+        any(not isinstance(element, str) for element, _ in 
query_object.orderby)
+        and SemanticViewFeature.ADHOC_EXPRESSIONS_IN_ORDERBY
+        not in semantic_view.features
+    ):
+        raise ValueError(
+            "Adhoc expressions in order by are not supported in this Semantic 
View."
+        )
+
+    elements = {orderby[0] for orderby in query_object.orderby}
+    metric_names = {metric.name for metric in semantic_view.metrics}
+    dimension_names = {dimension.name for dimension in 
semantic_view.dimensions}
+    if not elements <= metric_names | dimension_names:
+        raise ValueError("All order by elements must be defined in the 
Semantic View.")
diff --git a/tests/unit_tests/semantic_layers/mapper_test.py 
b/tests/unit_tests/semantic_layers/mapper_test.py
new file mode 100644
index 00000000000..9e5c10d50a6
--- /dev/null
+++ b/tests/unit_tests/semantic_layers/mapper_test.py
@@ -0,0 +1,1586 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from unittest.mock import MagicMock
+
+import pandas as pd
+import pytest
+from pytest_mock import MockerFixture
+from superset_core.semantic_layers.semantic_view import SemanticViewFeature
+from superset_core.semantic_layers.types import (
+    AdhocExpression,
+    AdhocFilter,
+    Day,
+    Dimension,
+    Filter,
+    Grain,
+    GroupLimit,
+    Hour,
+    INTEGER,
+    Metric,
+    Minute,
+    Month,
+    NUMBER,
+    Operator,
+    OrderDirection,
+    PredicateType,
+    Quarter,
+    Second,
+    SemanticQuery,
+    SemanticRequest,
+    SemanticResult,
+    STRING,
+    Week,
+    Year,
+)
+
+from superset.semantic_layers.mapper import (
+    _convert_query_object_filter,
+    _convert_time_grain,
+    _get_filters_from_extras,
+    _get_filters_from_query_object,
+    _get_group_limit_filters,
+    _get_group_limit_from_query_object,
+    _get_order_from_query_object,
+    _get_time_bounds,
+    _get_time_filter,
+    get_results,
+    map_query_object,
+    validate_query_object,
+    ValidatedQueryObject,
+    ValidatedQueryObjectFilterClause,
+)
+from superset.utils.core import FilterOperator
+
+# Alias for convenience
+Feature = SemanticViewFeature
+
+
+class MockSemanticView:
+    """
+    Mock implementation of SemanticView protocol.
+    """
+
+    def __init__(
+        self,
+        dimensions: set[Dimension],
+        metrics: set[Metric],
+        features: frozenset[SemanticViewFeature],
+    ):
+        self.dimensions = dimensions
+        self.metrics = metrics
+        self.features = features
+
+    def uid(self) -> str:
+        return "mock_semantic_view"
+
+    def get_dimensions(self) -> set[Dimension]:
+        return self.dimensions
+
+    def get_metrics(self) -> set[Metric]:
+        return self.metrics
+
+
[email protected]
+def mock_datasource(mocker: MockerFixture) -> MagicMock:
+    """
+    Create a mock datasource with semantic view implementation.
+    """
+    datasource = mocker.Mock()
+
+    # Create dimensions
+    time_dim = Dimension(
+        id="orders.order_date",
+        name="order_date",
+        type=STRING,
+        description="Order date",
+        definition="order_date",
+    )
+    category_dim = Dimension(
+        id="products.category",
+        name="category",
+        type=STRING,
+        description="Product category",
+        definition="category",
+    )
+    region_dim = Dimension(
+        id="customers.region",
+        name="region",
+        type=STRING,
+        description="Customer region",
+        definition="region",
+    )
+
+    # Create metrics
+    sales_metric = Metric(
+        id="orders.total_sales",
+        name="total_sales",
+        type=NUMBER,
+        definition="SUM(amount)",
+        description="Total sales",
+    )
+    count_metric = Metric(
+        id="orders.order_count",
+        name="order_count",
+        type=INTEGER,
+        definition="COUNT(*)",
+        description="Order count",
+    )
+
+    # Create semantic view implementation
+    implementation = MockSemanticView(
+        dimensions={time_dim, category_dim, region_dim},
+        metrics={sales_metric, count_metric},
+        features=frozenset(
+            {
+                SemanticViewFeature.GROUP_LIMIT,
+                SemanticViewFeature.GROUP_OTHERS,
+            }
+        ),
+    )
+
+    datasource.implementation = implementation
+    datasource.fetch_values_predicate = None
+
+    return datasource
+
+
[email protected](
+    "input_grain, expected_grain",
+    [
+        ("PT1S", Second),
+        ("PT1M", Minute),
+        ("PT1H", Hour),
+        ("P1D", Day),
+        ("P1W", Week),
+        ("P1M", Month),
+        ("P1Y", Year),
+        ("P3M", Quarter),
+        ("INVALID", None),
+        ("", None),
+    ],
+)
+def test_convert_date_time_grain(
+    input_grain: str,
+    expected_grain: Grain,
+) -> None:
+    """
+    Test conversion of time grains (hour, minute, second).
+    """
+    assert _convert_time_grain(input_grain) == expected_grain
+
+
+def test_get_filters_from_extras_empty() -> None:
+    """
+    Test that empty extras returns empty set.
+    """
+    result = _get_filters_from_extras({})
+    assert result == set()
+
+
+def test_get_filters_from_extras_where() -> None:
+    """
+    Test extraction of WHERE clause from extras.
+    """
+    extras = {"where": "customer_id > 100"}
+    result = _get_filters_from_extras(extras)
+
+    assert len(result) == 1
+    filter_ = next(iter(result))
+    assert isinstance(filter_, AdhocFilter)
+    assert filter_.type == PredicateType.WHERE
+    assert filter_.definition == "customer_id > 100"
+
+
+def test_get_filters_from_extras_having() -> None:
+    """
+    Test extraction of HAVING clause from extras.
+    """
+    extras = {"having": "SUM(sales) > 1000"}
+    result = _get_filters_from_extras(extras)
+
+    assert result == {
+        AdhocFilter(type=PredicateType.HAVING, definition="SUM(sales) > 1000"),
+    }
+
+
+def test_get_filters_from_extras_both() -> None:
+    """
+    Test extraction of both WHERE and HAVING from extras.
+    """
+    extras = {
+        "where": "region = 'US'",
+        "having": "COUNT(*) > 10",
+    }
+    result = _get_filters_from_extras(extras)
+
+    assert result == {
+        AdhocFilter(type=PredicateType.WHERE, definition="region = 'US'"),
+        AdhocFilter(type=PredicateType.HAVING, definition="COUNT(*) > 10"),
+    }
+
+
+def test_get_time_bounds_no_offset(mock_datasource: MagicMock) -> None:
+    """
+    Test time bounds without offset.
+    """
+    from_dttm = datetime(2025, 10, 15, 0, 0, 0)
+    to_dttm = datetime(2025, 10, 22, 23, 59, 59)
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=from_dttm,
+        to_dttm=to_dttm,
+        metrics=["total_sales"],
+        columns=["category"],
+    )
+
+    result_from, result_to = _get_time_bounds(query_object, None)
+
+    assert result_from == from_dttm
+    assert result_to == to_dttm
+
+
+def test_get_time_filter_no_granularity(mock_datasource: MagicMock) -> None:
+    """
+    Test that no time filter is created without granularity.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity=None,
+    )
+
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    result = _get_time_filter(query_object, None, all_dimensions)
+
+    assert result == set()
+
+
+def test_get_time_filter_with_granularity(mock_datasource: MagicMock) -> None:
+    """
+    Test time filter creation with granularity.
+    """
+    from_dttm = datetime(2025, 10, 15, 0, 0, 0)
+    to_dttm = datetime(2025, 10, 22, 23, 59, 59)
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=from_dttm,
+        to_dttm=to_dttm,
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        granularity="order_date",
+    )
+
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    result = _get_time_filter(query_object, None, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=from_dttm,
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=to_dttm,
+        ),
+    }
+
+
+def test_convert_query_object_filter_temporal_range() -> None:
+    """
+    Test that TEMPORAL_RANGE filters are skipped.
+    """
+    all_dimensions: dict[str, Dimension] = {}
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": FilterOperator.TEMPORAL_RANGE.value,
+        "col": "order_date",
+        "val": "Last 7 days",
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result is None
+
+
+def test_convert_query_object_filter_in(mock_datasource: MagicMock) -> None:
+    """
+    Test conversion of IN filter.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": FilterOperator.IN.value,
+        "col": "category",
+        "val": ["Electronics", "Books"],
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["category"],
+            operator=Operator.IN,
+            value=frozenset({"Electronics", "Books"}),
+        )
+    }
+
+
+def test_convert_query_object_filter_is_null(mock_datasource: MagicMock) -> 
None:
+    """
+    Test conversion of IS_NULL filter.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": FilterOperator.IS_NULL.value,
+        "col": "region",
+        "val": None,
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["region"],
+            operator=Operator.IS_NULL,
+            value=None,
+        )
+    }
+
+
+def test_get_filters_from_query_object_basic(mock_datasource: MagicMock) -> 
None:
+    """
+    Test basic filter extraction from query object.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        granularity="order_date",
+    )
+
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    result = _get_filters_from_query_object(query_object, None, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 10, 15),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22),
+        ),
+    }
+
+
+def test_get_filters_from_query_object_with_extras(mock_datasource: MagicMock) 
-> None:
+    """
+    Test filter extraction with extras.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        extras={"where": "customer_id > 100"},
+    )
+
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    result = _get_filters_from_query_object(query_object, None, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 10, 15),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22),
+        ),
+        AdhocFilter(
+            type=PredicateType.WHERE,
+            definition="customer_id > 100",
+        ),
+    }
+
+
+def test_get_filters_from_query_object_with_fetch_values(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test filter extraction with fetch values predicate.
+    """
+    mock_datasource.fetch_values_predicate = "tenant_id = 123"
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        apply_fetch_values_predicate=True,
+    )
+
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    result = _get_filters_from_query_object(query_object, None, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 10, 15),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22),
+        ),
+        AdhocFilter(
+            type=PredicateType.WHERE,
+            definition="tenant_id = 123",
+        ),
+    }
+
+
+def test_get_order_from_query_object_metric(mock_datasource: MagicMock) -> 
None:
+    """
+    Test order extraction with metric.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["category"],
+        orderby=[("total_sales", False)],  # DESC
+    )
+
+    result = _get_order_from_query_object(query_object, all_metrics, 
all_dimensions)
+
+    assert result == [(all_metrics["total_sales"], OrderDirection.DESC)]
+
+
+def test_get_order_from_query_object_dimension(mock_datasource: MagicMock) -> 
None:
+    """
+    Test order extraction with dimension.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["category"],
+        orderby=[("category", True)],  # ASC
+    )
+
+    result = _get_order_from_query_object(query_object, all_metrics, 
all_dimensions)
+
+    assert result == [(all_dimensions["category"], OrderDirection.ASC)]
+
+
+def test_get_order_from_query_object_adhoc(mock_datasource: MagicMock) -> None:
+    """
+    Test order extraction with adhoc expression.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["category"],
+        orderby=[({"label": "custom_order", "sqlExpression": "RAND()"}, True)],
+    )
+
+    result = _get_order_from_query_object(query_object, all_metrics, 
all_dimensions)
+
+    assert result == [
+        (
+            AdhocExpression(
+                id="custom_order",
+                definition="RAND()",
+            ),
+            OrderDirection.ASC,
+        )
+    ]
+
+
+def test_get_group_limit_from_query_object_none(mock_datasource: MagicMock) -> 
None:
+    """
+    Test that None is returned with no columns.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=[],  # No columns
+    )
+
+    result = _get_group_limit_from_query_object(
+        query_object,
+        all_metrics,
+        all_dimensions,
+    )
+
+    assert result is None
+
+
+def test_get_group_limit_from_query_object_basic(mock_datasource: MagicMock) 
-> None:
+    """
+    Test basic group limit creation.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["category", "region"],
+        series_columns=["category"],
+        series_limit=10,
+        series_limit_metric="total_sales",
+        order_desc=True,
+    )
+
+    result = _get_group_limit_from_query_object(
+        query_object,
+        all_metrics,
+        all_dimensions,
+    )
+
+    assert result == GroupLimit(
+        top=10,
+        dimensions=[all_dimensions["category"]],
+        metric=all_metrics["total_sales"],
+        direction=OrderDirection.DESC,
+        group_others=False,
+        filters=None,
+    )
+
+
+def test_get_group_limit_from_query_object_with_group_others(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test group limit with group_others enabled.
+    """
+    all_metrics = {
+        metric.name: metric for metric in 
mock_datasource.implementation.metrics
+    }
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["category"],
+        series_columns=["category"],
+        series_limit=5,
+        series_limit_metric="total_sales",
+        group_others_when_limit_reached=True,
+    )
+
+    result = _get_group_limit_from_query_object(
+        query_object,
+        all_metrics,
+        all_dimensions,
+    )
+
+    assert result
+    assert result.group_others is True
+
+
+def test_get_group_limit_filters_no_inner_bounds(mock_datasource: MagicMock) 
-> None:
+    """
+    Test that None is returned when no inner bounds.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        inner_from_dttm=None,
+        inner_to_dttm=None,
+        metrics=["total_sales"],
+        columns=["category"],
+    )
+
+    result = _get_group_limit_filters(query_object, all_dimensions)
+
+    assert result is None
+
+
+def test_get_group_limit_filters_same_bounds(mock_datasource: MagicMock) -> 
None:
+    """
+    Test that None is returned when inner bounds equal outer bounds.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    from_dttm = datetime(2025, 10, 15)
+    to_dttm = datetime(2025, 10, 22)
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=from_dttm,
+        to_dttm=to_dttm,
+        inner_from_dttm=from_dttm,  # Same
+        inner_to_dttm=to_dttm,  # Same
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+    )
+
+    result = _get_group_limit_filters(query_object, all_dimensions)
+
+    assert result is None
+
+
+def test_get_group_limit_filters_different_bounds(mock_datasource: MagicMock) 
-> None:
+    """
+    Test filter creation when inner bounds differ.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        inner_from_dttm=datetime(2025, 9, 22),  # Different (30 days)
+        inner_to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+    )
+
+    result = _get_group_limit_filters(query_object, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 9, 22),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22),
+        ),
+    }
+
+
+def test_get_group_limit_filters_with_extras(mock_datasource: MagicMock) -> 
None:
+    """
+    Test that extras filters are included in group limit filters.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        inner_from_dttm=datetime(2025, 9, 22),
+        inner_to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        extras={"where": "customer_id > 100"},
+    )
+
+    result = _get_group_limit_filters(query_object, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 9, 22),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["order_date"],
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22),
+        ),
+        AdhocFilter(
+            type=PredicateType.WHERE,
+            definition="customer_id > 100",
+        ),
+    }
+
+
+def test_map_query_object_basic(mock_datasource: MagicMock) -> None:
+    """
+    Test basic query object mapping.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        row_limit=100,
+        row_offset=10,
+    )
+
+    result = map_query_object(query_object)
+
+    assert result == [
+        SemanticQuery(
+            metrics=[
+                Metric(
+                    id="orders.total_sales",
+                    name="total_sales",
+                    type=NUMBER,
+                    definition="SUM(amount)",
+                    description="Total sales",
+                ),
+            ],
+            dimensions=[
+                Dimension(
+                    id="products.category",
+                    name="category",
+                    type=STRING,
+                    definition="category",
+                    description="Product category",
+                    grain=None,
+                ),
+            ],
+            filters={
+                Filter(
+                    type=PredicateType.WHERE,
+                    column=Dimension(
+                        id="orders.order_date",
+                        name="order_date",
+                        type=STRING,
+                        definition="order_date",
+                        description="Order date",
+                        grain=None,
+                    ),
+                    operator=Operator.GREATER_THAN_OR_EQUAL,
+                    value=datetime(2025, 10, 15, 0, 0),
+                ),
+                Filter(
+                    type=PredicateType.WHERE,
+                    column=Dimension(
+                        id="orders.order_date",
+                        name="order_date",
+                        type=STRING,
+                        definition="order_date",
+                        description="Order date",
+                        grain=None,
+                    ),
+                    operator=Operator.LESS_THAN,
+                    value=datetime(2025, 10, 22, 0, 0),
+                ),
+            },
+            order=[],
+            limit=100,
+            offset=10,
+            group_limit=None,
+        )
+    ]
+
+
+def test_map_query_object_with_time_offsets(mock_datasource: MagicMock) -> 
None:
+    """
+    Test mapping with time offsets.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        time_offsets=["1 week ago", "1 month ago"],
+    )
+
+    result = map_query_object(query_object)
+
+    # Should have 3 queries: main + 2 offsets
+    assert len(result) == 3
+    assert result[0].filters == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 10, 15, 0, 0),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 22, 0, 0),
+        ),
+    }
+    assert result[1].filters == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 10, 8, 0, 0),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 10, 15, 0, 0),
+        ),
+    }
+    assert result[2].filters == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.GREATER_THAN_OR_EQUAL,
+            value=datetime(2025, 9, 15, 0, 0),
+        ),
+        Filter(
+            type=PredicateType.WHERE,
+            column=Dimension(
+                id="orders.order_date",
+                name="order_date",
+                type=STRING,
+                definition="order_date",
+                description="Order date",
+                grain=None,
+            ),
+            operator=Operator.LESS_THAN,
+            value=datetime(2025, 9, 22, 0, 0),
+        ),
+    }
+
+
+def test_convert_query_object_filter_unknown_operator(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test filter with unknown operator returns None.
+    """
+    all_dimensions = {
+        dim.name: dim for dim in mock_datasource.implementation.dimensions
+    }
+
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": "UNKNOWN_OPERATOR",
+        "col": "category",
+        "val": "Electronics",
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result is None
+
+
+def test_validate_query_object_undefined_metric_error(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test validation error for undefined metrics.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["undefined_metric"],
+        columns=["order_date"],
+    )
+
+    with pytest.raises(ValueError, match="All metrics must be defined"):
+        validate_query_object(query_object)
+
+
+def test_validate_query_object_undefined_dimension_error(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test validation error for undefined dimensions.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["undefined_dimension"],
+    )
+
+    with pytest.raises(ValueError, match="All dimensions must be defined"):
+        validate_query_object(query_object)
+
+
+def test_validate_query_object_time_grain_without_column_error(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test validation error when time grain provided without time column.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        granularity=None,  # No time column
+        extras={"time_grain_sqla": "P1D"},
+    )
+
+    with pytest.raises(ValueError, match="time column must be specified"):
+        validate_query_object(query_object)
+
+
+def test_validate_query_object_unsupported_time_grain_error(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test validation error for unsupported time grain.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        granularity="order_date",
+        extras={"time_grain_sqla": "P1Y"},  # Year grain not supported
+    )
+
+    with pytest.raises(
+        ValueError,
+        match=(
+            "The time grain is not supported for the time column in the 
Semantic View."
+        ),
+    ):
+        validate_query_object(query_object)
+
+
+def test_validate_query_object_group_limit_not_supported_error(
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test validation error when group limit not supported.
+    """
+    mock_datasource = mocker.Mock()
+    time_dim = Dimension("order_date", "order_date", STRING, "order_date", 
"Date")
+    category_dim = Dimension("category", "category", STRING, "category", 
"Category")
+    sales_metric = Metric("total_sales", "total_sales", NUMBER, "SUM(amount)", 
"Sales")
+
+    mock_datasource.implementation.dimensions = {time_dim, category_dim}
+    mock_datasource.implementation.metrics = {sales_metric}
+    mock_datasource.implementation.features = frozenset()  # No GROUP_LIMIT 
feature
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        series_columns=["category"],
+        series_limit=10,
+    )
+
+    with pytest.raises(ValueError, match="Group limit is not supported"):
+        validate_query_object(query_object)
+
+
+def test_validate_query_object_undefined_series_column_error(
+    mock_datasource: MagicMock,
+) -> None:
+    """
+    Test validation error for undefined series columns.
+    """
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        metrics=["total_sales"],
+        columns=["order_date", "category"],
+        series_columns=["undefined_column"],
+        series_limit=10,
+    )
+
+    with pytest.raises(ValueError, match="All series columns must be defined"):
+        validate_query_object(query_object)
+
+
[email protected](
+    "filter_op, expected_operator",
+    [
+        ("==", Operator.EQUALS),
+        ("!=", Operator.NOT_EQUALS),
+        ("<", Operator.LESS_THAN),
+        (">", Operator.GREATER_THAN),
+        ("<=", Operator.LESS_THAN_OR_EQUAL),
+        (">=", Operator.GREATER_THAN_OR_EQUAL),
+    ],
+)
+def test_convert_query_object_filter(
+    filter_op: str,
+    expected_operator: Operator,
+) -> None:
+    """
+    Test filter with different operators.
+    """
+    all_dimensions = {
+        "category": Dimension("category", "category", STRING, "category", 
"Category")
+    }
+
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": filter_op,
+        "col": "category",
+        "val": "Electronics",
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["category"],
+            operator=expected_operator,
+            value="Electronics",
+        )
+    }
+
+
+def test_convert_query_object_filter_like() -> None:
+    """
+    Test filter with LIKE operator.
+    """
+    all_dimensions = {"name": Dimension("name", "name", STRING, "name", 
"Name")}
+
+    filter_: ValidatedQueryObjectFilterClause = {
+        "op": "LIKE",
+        "col": "name",
+        "val": "%test%",
+    }
+
+    result = _convert_query_object_filter(filter_, all_dimensions)
+
+    assert result == {
+        Filter(
+            type=PredicateType.WHERE,
+            column=all_dimensions["name"],
+            operator=Operator.LIKE,
+            value="%test%",
+        )
+    }
+
+
+def test_get_results_without_time_offsets(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results without time offsets returns main query result.
+    """
+    # Create mock dataframe for main query
+    main_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [1000.0, 500.0, 750.0],
+        }
+    )
+
+    # Mock the semantic view's get_dataframe method
+    mock_result = SemanticResult(
+        requests=[
+            SemanticRequest(
+                type="SQL",
+                definition="SELECT category, SUM(amount) FROM orders GROUP BY 
category",
+            )
+        ],
+        results=main_df,
+    )
+
+    mock_datasource.implementation.get_dataframe = 
mocker.Mock(return_value=mock_result)
+
+    # Create query object without time offsets
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify result is a QueryResult
+    assert result.df is not None
+    assert "SQL" in result.query
+
+    # Verify DataFrame matches main query result
+    pd.testing.assert_frame_equal(result.df, main_df)
+
+
+def test_get_results_with_single_time_offset(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results with a single time offset joins correctly.
+    """
+    # Create mock dataframes
+    main_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [1000.0, 500.0, 750.0],
+        }
+    )
+
+    offset_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [950.0, 480.0, 700.0],
+        }
+    )
+
+    # Mock the semantic view's get_dataframe method
+    # It will be called twice: once for main, once for offset
+    mock_main_result = SemanticResult(
+        requests=[
+            SemanticRequest(
+                type="SQL",
+                definition=(
+                    "SELECT category, SUM(amount) FROM orders "
+                    "WHERE date >= '2025-10-15' GROUP BY category"
+                ),
+            )
+        ],
+        results=main_df.copy(),
+    )
+
+    mock_offset_result = SemanticResult(
+        requests=[
+            SemanticRequest(
+                type="SQL",
+                definition=(
+                    "SELECT category, SUM(amount) FROM orders "
+                    "WHERE date >= '2025-10-08' GROUP BY category"
+                ),
+            )
+        ],
+        results=offset_df.copy(),
+    )
+
+    mock_datasource.implementation.get_dataframe = mocker.Mock(
+        side_effect=[mock_main_result, mock_offset_result]
+    )
+
+    # Create query object with time offset
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        time_offsets=["1 week ago"],
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify result structure - QueryResult with query containing both SQL 
statements
+    assert result.df is not None
+    assert "SQL" in result.query
+
+    # Verify DataFrame has both main and offset metrics
+    expected_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [1000.0, 500.0, 750.0],
+            "total_sales__1 week ago": [950.0, 480.0, 700.0],
+        }
+    )
+
+    pd.testing.assert_frame_equal(result.df, expected_df)
+
+
+def test_get_results_with_multiple_time_offsets(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results with multiple time offsets joins all correctly.
+    """
+    # Create mock dataframes
+    main_df = pd.DataFrame(
+        {
+            "region": ["US", "UK", "JP"],
+            "order_count": [100, 50, 75],
+        }
+    )
+
+    offset_1w_df = pd.DataFrame(
+        {
+            "region": ["US", "UK", "JP"],
+            "order_count": [95, 48, 70],
+        }
+    )
+
+    offset_1m_df = pd.DataFrame(
+        {
+            "region": ["US", "UK", "JP"],
+            "order_count": [80, 40, 60],
+        }
+    )
+
+    # Mock results
+    mock_main_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="MAIN QUERY")],
+        results=main_df.copy(),
+    )
+
+    mock_offset_1w_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="OFFSET 1W QUERY")],
+        results=offset_1w_df.copy(),
+    )
+
+    mock_offset_1m_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="OFFSET 1M QUERY")],
+        results=offset_1m_df.copy(),
+    )
+
+    mock_datasource.implementation.get_dataframe = mocker.Mock(
+        side_effect=[mock_main_result, mock_offset_1w_result, 
mock_offset_1m_result]
+    )
+
+    # Create query object with multiple time offsets
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["order_count"],
+        columns=["region"],
+        granularity="order_date",
+        time_offsets=["1 week ago", "1 month ago"],
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify result structure - QueryResult with combined query strings
+    assert result.df is not None
+    assert "MAIN QUERY" in result.query
+    assert "OFFSET 1W QUERY" in result.query
+    assert "OFFSET 1M QUERY" in result.query
+
+    # Verify DataFrame has all metrics
+    expected_df = pd.DataFrame(
+        {
+            "region": ["US", "UK", "JP"],
+            "order_count": [100, 50, 75],
+            "order_count__1 week ago": [95, 48, 70],
+            "order_count__1 month ago": [80, 40, 60],
+        }
+    )
+
+    pd.testing.assert_frame_equal(result.df, expected_df)
+
+
+def test_get_results_with_empty_offset_result(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results handles empty offset results gracefully.
+    """
+    # Create mock dataframes
+    main_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books"],
+            "total_sales": [1000.0, 500.0],
+        }
+    )
+
+    # Empty offset result
+    offset_df = pd.DataFrame()
+
+    # Mock results
+    mock_main_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="MAIN QUERY")],
+        results=main_df.copy(),
+    )
+
+    mock_offset_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="OFFSET QUERY")],
+        results=offset_df,
+    )
+
+    mock_datasource.implementation.get_dataframe = mocker.Mock(
+        side_effect=[mock_main_result, mock_offset_result]
+    )
+
+    # Create query object with time offset
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        time_offsets=["1 week ago"],
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify result structure
+    assert result.df is not None
+    assert "MAIN QUERY" in result.query
+    assert "OFFSET QUERY" in result.query
+
+    # Verify DataFrame has NaN for missing offset data
+    assert "total_sales__1 week ago" in result.df.columns
+    assert result.df["total_sales__1 week ago"].isna().all()
+
+
+def test_get_results_with_partial_offset_match(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results with partial matches in offset data (left join behavior).
+    """
+    # Main query has 3 categories
+    main_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [1000.0, 500.0, 750.0],
+        }
+    )
+
+    # Offset query only has 2 categories (Books missing)
+    offset_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Clothing"],
+            "total_sales": [950.0, 700.0],
+        }
+    )
+
+    # Mock results
+    mock_main_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="MAIN QUERY")],
+        results=main_df.copy(),
+    )
+
+    mock_offset_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="OFFSET QUERY")],
+        results=offset_df.copy(),
+    )
+
+    mock_datasource.implementation.get_dataframe = mocker.Mock(
+        side_effect=[mock_main_result, mock_offset_result]
+    )
+
+    # Create query object
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+        time_offsets=["1 week ago"],
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify DataFrame structure
+    expected_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Books", "Clothing"],
+            "total_sales": [1000.0, 500.0, 750.0],
+            "total_sales__1 week ago": [950.0, None, 700.0],
+        }
+    )
+
+    pd.testing.assert_frame_equal(result.df, expected_df)
+
+
+def test_get_results_with_multiple_dimensions(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    """
+    Test get_results with multiple dimension columns in join.
+    """
+    # Create mock dataframes with multiple dimensions
+    main_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Electronics", "Books"],
+            "region": ["US", "UK", "US"],
+            "total_sales": [1000.0, 800.0, 500.0],
+        }
+    )
+
+    offset_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Electronics", "Books"],
+            "region": ["US", "UK", "US"],
+            "total_sales": [950.0, 780.0, 480.0],
+        }
+    )
+
+    # Mock results
+    mock_main_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="MAIN QUERY")],
+        results=main_df.copy(),
+    )
+
+    mock_offset_result = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="OFFSET QUERY")],
+        results=offset_df.copy(),
+    )
+
+    mock_datasource.implementation.get_dataframe = mocker.Mock(
+        side_effect=[mock_main_result, mock_offset_result]
+    )
+
+    # Create query object with multiple dimensions
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category", "region"],
+        granularity="order_date",
+        time_offsets=["1 week ago"],
+    )
+
+    # Call get_results
+    result = get_results(query_object)
+
+    # Verify DataFrame structure - join should be on both category and region
+    expected_df = pd.DataFrame(
+        {
+            "category": ["Electronics", "Electronics", "Books"],
+            "region": ["US", "UK", "US"],
+            "total_sales": [1000.0, 800.0, 500.0],
+            "total_sales__1 week ago": [950.0, 780.0, 480.0],
+        }
+    )
+
+    pd.testing.assert_frame_equal(result.df, expected_df)

Reply via email to