korbit-ai[bot] commented on code in PR #34844:
URL: https://github.com/apache/superset/pull/34844#discussion_r2298322365
##########
docker/pythonpath_dev/superset_config.py:
##########
@@ -99,7 +99,10 @@ class CeleryConfig:
CELERY_CONFIG = CeleryConfig
-FEATURE_FLAGS = {"ALERT_REPORTS": True}
+FEATURE_FLAGS = {
+ "ALERT_REPORTS": True,
+ "DATASET_SHADOW_WRITING_ENABLED": True,
+}
Review Comment:
### Shadow Writing Flag Lacks Runtime Control <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
The DATASET_SHADOW_WRITING_ENABLED flag is permanently enabled in the
development configuration without any fallback mechanism or environment
variable control.
###### Why this matters
In a development environment, having shadow writing permanently enabled
could cause unnecessary data duplication and performance overhead when testing
other features. It may also mask potential issues that could occur when the
feature is disabled in production.
###### Suggested change ∙ *Feature Preview*
Modify the code to allow runtime control of the shadow writing feature:
```python
FEATURE_FLAGS = {
"ALERT_REPORTS": True,
"DATASET_SHADOW_WRITING_ENABLED":
os.getenv("DATASET_SHADOW_WRITING_ENABLED", "true").lower() == "true",
}
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/5268d3a9-7f35-4c5d-9dfa-c54f89edb010/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/5268d3a9-7f35-4c5d-9dfa-c54f89edb010?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/5268d3a9-7f35-4c5d-9dfa-c54f89edb010?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/5268d3a9-7f35-4c5d-9dfa-c54f89edb010?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/5268d3a9-7f35-4c5d-9dfa-c54f89edb010)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:7524f0a4-5271-420c-9885-2cb56dc401c5 -->
[](7524f0a4-5271-420c-9885-2cb56dc401c5)
##########
superset/models/dataset.py:
##########
@@ -0,0 +1,261 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""New dataset models following SIP-68 specification"""
+
+from __future__ import annotations
+
+import uuid
+from typing import Any, Optional
+
+from flask_appbuilder import Model
+from flask_appbuilder.security.sqla.models import User
+from sqlalchemy import (
+ Boolean,
+ Column,
+ DateTime,
+ Enum,
+ ForeignKey,
+ Integer,
+ String,
+ Table as SqlaTable,
+ Text,
+)
+from sqlalchemy.dialects.postgresql import UUID
+from sqlalchemy.ext.hybrid import hybrid_property
+from sqlalchemy.orm import Mapped, relationship
+from sqlalchemy.sql import func
+from sqlalchemy.types import JSON
+
+from superset.models.helpers import AuditMixinNullable, ImportExportMixin,
UUIDMixin
+from superset.utils import core as utils
+from superset.utils.backports import StrEnum
+
+metadata = Model.metadata
+
+
+class DatasetKind(StrEnum):
+ PHYSICAL = "physical"
+ VIRTUAL = "virtual"
+
+
+# Association table for Dataset-Table many-to-many relationship
+dataset_table_association = SqlaTable(
+ "sip68_dataset_table_association",
+ metadata,
+ Column("dataset_id", Integer, ForeignKey("sip68_datasets.id",
ondelete="CASCADE")),
+ Column("table_id", Integer, ForeignKey("sip68_tables.id",
ondelete="CASCADE")),
+)
+
+# Association table for Dataset-User (owners) many-to-many relationship
+dataset_user_association = SqlaTable(
+ "sip68_dataset_user_association",
+ metadata,
+ Column("dataset_id", Integer, ForeignKey("sip68_datasets.id",
ondelete="CASCADE")),
+ Column("user_id", Integer, ForeignKey("ab_user.id", ondelete="CASCADE")),
+)
+
+
+class Table(Model, AuditMixinNullable, ImportExportMixin, UUIDMixin):
+ """
+ Represents a physical table or view in a database.
+ This model stores the basic table metadata without any semantic enrichment.
+ """
+
+ __tablename__ = "sip68_tables"
+
+ id = Column(Integer, primary_key=True)
+ uuid = Column(UUID(as_uuid=True), default=uuid.uuid4, unique=True,
nullable=False)
+
+ # Database connection
+ database_id = Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+ # Table location
+ catalog = Column(String(256), nullable=True)
+ schema = Column(String(255), nullable=True)
+ name = Column(String(250), nullable=False)
+
+ # Relationships
+ database = relationship("Database", back_populates="new_tables")
+ columns: Mapped[list[Column]] = relationship(
+ "Column",
+ back_populates="table",
+ cascade="all, delete-orphan",
+ foreign_keys="Column.table_id",
+ )
+ datasets = relationship("Dataset", secondary=dataset_table_association,
back_populates="tables")
+
+ @hybrid_property
+ def full_name(self) -> str:
+ """Return the fully qualified table name"""
+ parts = []
+ if self.catalog:
+ parts.append(self.catalog)
+ if self.schema:
+ parts.append(self.schema)
+ parts.append(self.name)
+ return ".".join(parts)
+
+ def __repr__(self) -> str:
+ return f"<Table {self.full_name}>"
+
+
+class Dataset(Model, AuditMixinNullable, ImportExportMixin, UUIDMixin):
+ """
+ Represents a dataset - the semantic layer on top of physical tables.
+ Datasets can be physical (1:1 with a table) or virtual (based on SQL
expressions).
+ """
+
+ __tablename__ = "sip68_datasets"
+
+ id = Column(Integer, primary_key=True)
+ uuid = Column(UUID(as_uuid=True), default=uuid.uuid4, unique=True,
nullable=False)
+
+ # Core dataset properties
+ name = Column(String(250), nullable=False)
+ kind = Column(Enum(DatasetKind), nullable=False,
default=DatasetKind.PHYSICAL)
+
+ # SQL expression defining the dataset
+ # For physical datasets: simple table name
+ # For virtual datasets: complex SQL query
+ expression = Column(utils.MediumText(), nullable=False)
+
+ # Metadata
+ description = Column(Text)
+ default_endpoint = Column(Text)
+ is_featured = Column(Boolean, default=False)
+ filter_select_enabled = Column(Boolean, default=True)
+ offset = Column("offset", Integer, default=0)
+ cache_timeout = Column(Integer)
+ params = Column(Text)
+ extra = Column(Text)
+
+ # Dataset-specific configuration
+ main_dttm_col = Column(String(250))
+ fetch_values_predicate = Column(Text)
+ normalize_columns = Column(Boolean, default=False)
+ always_filter_main_dttm = Column(Boolean, default=False)
+ sql = Column(utils.MediumText()) # For backward compatibility with
virtual datasets
+
+ # Relationships
+ owners = relationship(User, secondary=dataset_user_association,
backref="owned_datasets")
+ tables = relationship("Table", secondary=dataset_table_association,
back_populates="datasets")
+ columns: Mapped[list[Column]] = relationship(
+ "Column",
+ back_populates="dataset",
+ cascade="all, delete-orphan",
+ foreign_keys="Column.dataset_id",
+ )
+
+ @hybrid_property
+ def is_virtual(self) -> bool:
+ return self.kind == DatasetKind.VIRTUAL
+
+ @hybrid_property
+ def is_physical(self) -> bool:
+ return self.kind == DatasetKind.PHYSICAL
+
+ def __repr__(self) -> str:
+ return f"<Dataset {self.name} ({self.kind})>"
+
+
+class Column(Model, AuditMixinNullable, ImportExportMixin, UUIDMixin):
+ """
+ Unified column model representing:
+ 1. Physical table columns
+ 2. Dataset metrics (aggregations)
+ 3. Derived/computed columns
+ """
+
+ __tablename__ = "sip68_columns"
+
+ id = Column(Integer, primary_key=True)
+ uuid = Column(UUID(as_uuid=True), default=uuid.uuid4, unique=True,
nullable=False)
+
+ # Core column properties
+ name = Column(String(255), nullable=False)
+ type = Column("type", String(32)) # SQL type or "metric"
+
+ # SQL expression defining the column
+ # For table columns: column name (e.g., "user_id")
+ # For metrics: aggregation expression (e.g., "COUNT(*)")
+ # For derived columns: SQL expression (e.g., "revenue - cost")
+ expression = Column(utils.MediumText(), nullable=False)
+
+ # Metadata
+ verbose_name = Column(String(1024))
+ description = Column(utils.MediumText())
+ warning_text = Column(Text)
+ units = Column(String(128))
+
+ # Column properties
+ is_active = Column(Boolean, default=True)
+ is_temporal = Column(Boolean, default=False)
+ is_spatial = Column(Boolean, default=False)
+ is_partition = Column(Boolean, default=False)
+ is_aggregation = Column(Boolean, default=False) # True for metrics
+ is_additive = Column(Boolean, default=True)
+
+ # For exploration
+ groupby = Column(Boolean, default=True)
+ filterable = Column(Boolean, default=True)
+
+ # Data quality metrics (can be computed offline)
+ cardinality = Column(Integer)
+
+ # For business logic
+ increase_good = Column(Boolean, default=True) # For metrics: is increase
good?
+
+ # Formatting
+ d3format = Column(String(128))
+ currency = Column(JSON, nullable=True)
+ python_date_format = Column(String(255))
+
+ # Advanced properties
+ advanced_data_type = Column(String(255))
+ extra = Column(Text)
+
+ # Foreign keys - a column can belong to either a table or a dataset
+ table_id = Column(Integer, ForeignKey("sip68_tables.id",
ondelete="CASCADE"), nullable=True)
+ dataset_id = Column(Integer, ForeignKey("sip68_datasets.id",
ondelete="CASCADE"), nullable=True)
+
+ # Relationships
+ table = relationship("Table", back_populates="columns",
foreign_keys=[table_id])
+ dataset = relationship("Dataset", back_populates="columns",
foreign_keys=[dataset_id])
+
+ @hybrid_property
+ def is_metric(self) -> bool:
+ return self.is_aggregation
+
+ @hybrid_property
+ def is_derived(self) -> bool:
+ """True if this is a computed column (not a direct table column)"""
+ return self.table_id is None and not self.is_aggregation
Review Comment:
### Incorrect Derived Column Classification <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
The is_derived property logic doesn't account for columns that might have a
dataset_id but no table_id, potentially misclassifying columns.
###### Why this matters
This could lead to incorrect column classification in the application,
affecting how the column is processed and displayed.
###### Suggested change ∙ *Feature Preview*
Update the is_derived property to properly check for dataset columns:
```python
@hybrid_property
def is_derived(self) -> bool:
"""True if this is a computed column (not a direct table column)"""
return self.table_id is None and self.dataset_id is not None and not
self.is_aggregation
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/af35d67f-aec6-4e26-9a4f-c937f6d9a663/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/af35d67f-aec6-4e26-9a4f-c937f6d9a663?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/af35d67f-aec6-4e26-9a4f-c937f6d9a663?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/af35d67f-aec6-4e26-9a4f-c937f6d9a663?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/af35d67f-aec6-4e26-9a4f-c937f6d9a663)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:6b274c93-1978-4bb1-9cd0-797051ef3483 -->
[](6b274c93-1978-4bb1-9cd0-797051ef3483)
##########
superset/connectors/sqla/models.py:
##########
@@ -1971,3 +1971,27 @@ class RowLevelSecurityFilter(Model, AuditMixinNullable):
backref="row_level_security_filters",
)
clause = Column(utils.MediumText(), nullable=False)
+
+
+# Shadow writing hooks for SIP-68 dataset migration
+from sqlalchemy import event
+
+def sync_sqla_table_after_insert(mapper, connection, target):
+ """Sync SqlaTable to new models after insert."""
+ from superset.datasets.shadow_writer import shadow_writer
+ try:
+ shadow_writer.sync_sqla_table_to_new_models(target)
+ except Exception as e:
+ logger.warning("Error syncing SqlaTable %s to new models: %s",
target.id, e)
+
+def sync_sqla_table_after_update(mapper, connection, target):
+ """Sync SqlaTable to new models after update."""
+ from superset.datasets.shadow_writer import shadow_writer
+ try:
+ shadow_writer.sync_sqla_table_to_new_models(target)
+ except Exception as e:
+ logger.warning("Error syncing SqlaTable %s to new models: %s",
target.id, e)
Review Comment:
### Duplicate sync functions violate DRY principle <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
Duplicated code in sync_sqla_table_after_insert and
sync_sqla_table_after_update functions that perform the same operations
###### Why this matters
Code duplication increases maintenance overhead and risk of inconsistencies
when changes are needed. If the sync logic needs to change, it would need to be
updated in multiple places.
###### Suggested change ∙ *Feature Preview*
Consolidate the duplicate code into a single helper function:
```python
def _sync_sqla_table(target):
from superset.datasets.shadow_writer import shadow_writer
try:
shadow_writer.sync_sqla_table_to_new_models(target)
except Exception as e:
logger.warning("Error syncing SqlaTable %s to new models: %s",
target.id, e)
def sync_sqla_table_after_insert(mapper, connection, target):
_sync_sqla_table(target)
def sync_sqla_table_after_update(mapper, connection, target):
_sync_sqla_table(target)
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/0a42d98e-4379-478c-85bc-a2a8b0eb6dd2/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/0a42d98e-4379-478c-85bc-a2a8b0eb6dd2?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/0a42d98e-4379-478c-85bc-a2a8b0eb6dd2?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/0a42d98e-4379-478c-85bc-a2a8b0eb6dd2?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/0a42d98e-4379-478c-85bc-a2a8b0eb6dd2)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:a02f9743-9a73-4667-958c-cbe09995c446 -->
[](a02f9743-9a73-4667-958c-cbe09995c446)
##########
superset/datasets/shadow_writer.py:
##########
@@ -0,0 +1,433 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Shadow writing mechanism for dataset model migration (SIP-68).
+
+This module provides functionality to keep the old SqlaTable model
+and new Dataset/Table/Column models in sync during the transition period.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional
+
+from flask import current_app
+
+from superset import is_feature_enabled
+from sqlalchemy import and_
+from sqlalchemy.orm import Session
+
+from superset import db
+from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
+from superset.models.core import Database
+from superset.models.dataset import Column, Dataset, DatasetKind, Table
+
+logger = logging.getLogger(__name__)
+
+
+class ShadowWriter:
+ """
+ Handles synchronization between old and new dataset models.
+
+ This class implements shadow writing, ensuring that changes to either
+ the old SqlaTable model or the new Dataset/Table/Column models are
+ reflected in both representations.
+ """
+
+ def __init__(self, session: Optional[Session] = None) -> None:
+ self.session = session or db.session
+
+ def is_shadow_writing_enabled(self) -> bool:
+ """Check if shadow writing is enabled via feature flags."""
+ return is_feature_enabled("DATASET_SHADOW_WRITING_ENABLED")
+
+ def sync_sqla_table_to_new_models(self, sqla_table: SqlaTable) ->
tuple[Table, Dataset]:
+ """
+ Synchronize a SqlaTable to the new Table and Dataset models.
+
+ Args:
+ sqla_table: The SqlaTable instance to sync
+
+ Returns:
+ Tuple of (Table, Dataset) instances
+ """
+ if not self.is_shadow_writing_enabled():
+ return None, None
+
+ try:
+ # Use a separate session to avoid conflicts with ongoing
transactions
+ from sqlalchemy.orm import sessionmaker
+ Session = sessionmaker(bind=self.session.bind)
+ shadow_session = Session()
+
+ # Create a new shadow writer instance with the separate session
+ shadow_writer_instance = ShadowWriter(shadow_session)
+
+ # Find or create the corresponding Table
+ table = shadow_writer_instance._find_or_create_table(sqla_table)
+
+ # Find or create the corresponding Dataset
+ dataset =
shadow_writer_instance._find_or_create_dataset(sqla_table, table)
+
+ # Sync columns and metrics
+ shadow_writer_instance._sync_columns_and_metrics(sqla_table,
table, dataset)
+
+ # Commit the shadow session
+ shadow_session.commit()
+ shadow_session.close()
+
+ logger.debug(
+ "Synced SqlaTable %s to Table %s and Dataset %s",
+ sqla_table.id,
+ table.id if table else None,
+ dataset.id if dataset else None,
+ )
+
+ return table, dataset
+
+ except Exception as e:
+ logger.warning(
+ "Error syncing SqlaTable %s to new models: %s",
+ sqla_table.id,
+ str(e)
+ )
+ if 'shadow_session' in locals():
+ shadow_session.rollback()
+ shadow_session.close()
+ return None, None
+
+ def sync_new_models_to_sqla_table(self, dataset: Dataset) ->
Optional[SqlaTable]:
+ """
+ Synchronize a Dataset back to the SqlaTable model.
+
+ Args:
+ dataset: The Dataset instance to sync
+
+ Returns:
+ The corresponding SqlaTable instance
+ """
+ if not self.is_shadow_writing_enabled():
+ return None
+
+ # For now, we only support physical datasets in reverse sync
+ if dataset.is_virtual:
+ logger.warning("Reverse sync not supported for virtual datasets
yet")
+ return None
+
+ # Get the primary table for this dataset
+ if not dataset.tables:
+ logger.warning("Dataset %s has no associated tables", dataset.id)
+ return None
+
+ primary_table = dataset.tables[0] # For physical datasets, there
should be only one
+
+ # Find or create corresponding SqlaTable
+ sqla_table = self._find_or_create_sqla_table(dataset, primary_table)
+
+ # Sync properties
+ sqla_table.table_name = primary_table.name
+ sqla_table.schema = primary_table.schema
+ sqla_table.catalog = primary_table.catalog
+ sqla_table.database_id = primary_table.database_id
+ sqla_table.description = dataset.description
+ sqla_table.main_dttm_col = dataset.main_dttm_col
+ sqla_table.default_endpoint = dataset.default_endpoint
+ sqla_table.is_featured = dataset.is_featured
+ sqla_table.filter_select_enabled = dataset.filter_select_enabled
+ sqla_table.offset = dataset.offset
+ sqla_table.cache_timeout = dataset.cache_timeout
+ sqla_table.params = dataset.params
+ sqla_table.extra = dataset.extra
+ sqla_table.fetch_values_predicate = dataset.fetch_values_predicate
+
+ # Sync columns and metrics back
+ self._sync_new_columns_to_sqla_table(dataset, sqla_table)
+
+ logger.debug(
+ "Synced Dataset %s back to SqlaTable %s",
+ dataset.id,
+ sqla_table.id,
+ )
+
+ return sqla_table
+
+ def _find_or_create_table(self, sqla_table: SqlaTable) -> Table:
+ """Find or create a Table instance corresponding to a SqlaTable."""
+ # Look for existing Table
+ table = (
+ self.session.query(Table)
+ .filter(
+ and_(
+ Table.database_id == sqla_table.database_id,
+ Table.catalog == sqla_table.catalog,
+ Table.schema == sqla_table.schema,
+ Table.name == sqla_table.table_name,
+ )
+ )
+ .first()
+ )
+
+ if not table:
+ table = Table(
+ database_id=sqla_table.database_id,
+ catalog=sqla_table.catalog,
+ schema=sqla_table.schema,
+ name=sqla_table.table_name,
+ )
+ self.session.add(table)
+ self.session.flush() # Get the ID
+
+ return table
+
+ def _find_or_create_dataset(self, sqla_table: SqlaTable, table: Table) ->
Dataset:
+ """Find or create a Dataset instance corresponding to a SqlaTable."""
+ # For physical datasets, use the table name as dataset name
+ # For virtual datasets, use the existing table_name (which contains
the custom name)
+
+ kind = DatasetKind.VIRTUAL if sqla_table.sql else DatasetKind.PHYSICAL
+ name = sqla_table.table_name
+ expression = sqla_table.sql or sqla_table.table_name
+
+ # Look for existing Dataset
+ # We'll use a combination of name and expression to find existing
datasets
+ dataset = (
+ self.session.query(Dataset)
+ .filter(
+ and_(
+ Dataset.name == name,
+ Dataset.expression == expression,
+ )
+ )
+ .first()
+ )
+
+ if not dataset:
+ dataset = Dataset(
+ name=name,
+ kind=kind,
+ expression=expression,
+ description=sqla_table.description,
+ main_dttm_col=sqla_table.main_dttm_col,
+ default_endpoint=sqla_table.default_endpoint,
+ is_featured=sqla_table.is_featured,
+ filter_select_enabled=sqla_table.filter_select_enabled,
+ offset=sqla_table.offset,
+ cache_timeout=sqla_table.cache_timeout,
+ params=sqla_table.params,
+ extra=sqla_table.extra,
+ fetch_values_predicate=sqla_table.fetch_values_predicate,
+ sql=sqla_table.sql, # For backward compatibility
+ )
+ self.session.add(dataset)
+ self.session.flush() # Get the ID
+
+ # Associate with the table
+ if kind == DatasetKind.PHYSICAL:
+ dataset.tables.append(table)
+
+ return dataset
+
+ def _find_or_create_sqla_table(self, dataset: Dataset, table: Table) ->
SqlaTable:
+ """Find or create a SqlaTable instance corresponding to a Dataset."""
+ # Look for existing SqlaTable
+ sqla_table = (
+ self.session.query(SqlaTable)
+ .filter(
+ and_(
+ SqlaTable.database_id == table.database_id,
+ SqlaTable.catalog == table.catalog,
+ SqlaTable.schema == table.schema,
+ SqlaTable.table_name == table.name,
+ )
+ )
+ .first()
+ )
+
+ if not sqla_table:
+ sqla_table = SqlaTable(
+ table_name=table.name,
+ schema=table.schema,
+ catalog=table.catalog,
+ database_id=table.database_id,
+ )
+ self.session.add(sqla_table)
+ self.session.flush() # Get the ID
+
+ return sqla_table
+
+ def _sync_columns_and_metrics(
+ self, sqla_table: SqlaTable, table: Table, dataset: Dataset
+ ) -> None:
+ """Sync columns and metrics from SqlaTable to new models."""
+
+ # Sync table columns to Column model (table-level)
+ for table_column in sqla_table.columns:
+ if table_column.is_active: # Only sync active columns
+ self._sync_table_column(table_column, table)
+
+ # Sync columns to Column model (dataset-level) - these are
dataset-specific column configs
+ for table_column in sqla_table.columns:
+ if table_column.is_active:
+ self._sync_dataset_column(table_column, dataset)
+
+ # Sync metrics to Column model (dataset-level)
+ for metric in sqla_table.metrics:
+ self._sync_metric(metric, dataset)
+
+ def _sync_table_column(self, table_column: TableColumn, table: Table) ->
Column:
+ """Sync a TableColumn to a table-level Column."""
+ # Look for existing column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.table_id == table.id,
+ Column.name == table_column.column_name,
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ table_id=table.id,
+ name=table_column.column_name,
+ type=table_column.type,
+ expression=table_column.expression or table_column.column_name,
+ is_temporal=table_column.is_dttm,
+ python_date_format=table_column.python_date_format,
+ advanced_data_type=table_column.advanced_data_type,
+ extra=table_column.extra,
+ )
+ self.session.add(column)
+ else:
+ # Update existing column
+ column.type = table_column.type
+ column.expression = table_column.expression or
table_column.column_name
+ column.is_temporal = table_column.is_dttm
+ column.python_date_format = table_column.python_date_format
+ column.advanced_data_type = table_column.advanced_data_type
+ column.extra = table_column.extra
+
+ return column
+
+ def _sync_dataset_column(self, table_column: TableColumn, dataset:
Dataset) -> Column:
+ """Sync a TableColumn to a dataset-level Column."""
+ # Look for existing dataset column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.dataset_id == dataset.id,
+ Column.name == table_column.column_name,
+ Column.is_aggregation == False, # Not a metric
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ dataset_id=dataset.id,
+ name=table_column.column_name,
+ type=table_column.type,
+ expression=table_column.expression or table_column.column_name,
+ verbose_name=table_column.verbose_name,
+ description=table_column.description,
+ is_active=table_column.is_active,
+ is_temporal=table_column.is_dttm,
+ groupby=table_column.groupby,
+ filterable=table_column.filterable,
+ python_date_format=table_column.python_date_format,
+ advanced_data_type=table_column.advanced_data_type,
+ extra=table_column.extra,
+ is_aggregation=False,
+ )
+ self.session.add(column)
+ else:
+ # Update existing column
+ column.type = table_column.type
+ column.expression = table_column.expression or
table_column.column_name
+ column.verbose_name = table_column.verbose_name
+ column.description = table_column.description
+ column.is_active = table_column.is_active
+ column.is_temporal = table_column.is_dttm
+ column.groupby = table_column.groupby
+ column.filterable = table_column.filterable
+ column.python_date_format = table_column.python_date_format
+ column.advanced_data_type = table_column.advanced_data_type
+ column.extra = table_column.extra
+
+ return column
+
+ def _sync_metric(self, metric: SqlMetric, dataset: Dataset) -> Column:
+ """Sync a SqlMetric to a Column with is_aggregation=True."""
+ # Look for existing metric column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.dataset_id == dataset.id,
+ Column.name == metric.metric_name,
+ Column.is_aggregation == True,
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ dataset_id=dataset.id,
+ name=metric.metric_name,
+ type="metric",
+ expression=metric.expression,
+ verbose_name=metric.verbose_name,
+ description=metric.description,
+ warning_text=metric.warning_text,
+ d3format=metric.d3format,
+ currency=metric.currency,
+ extra=metric.extra,
+ is_aggregation=True,
+ is_active=True,
+ )
+ self.session.add(column)
+ else:
+ # Update existing metric column
+ column.expression = metric.expression
+ column.verbose_name = metric.verbose_name
+ column.description = metric.description
+ column.warning_text = metric.warning_text
+ column.d3format = metric.d3format
+ column.currency = metric.currency
+ column.extra = metric.extra
+
+ return column
+
+ def _sync_new_columns_to_sqla_table(self, dataset: Dataset, sqla_table:
SqlaTable) -> None:
+ """Sync columns from new models back to SqlaTable."""
+ # This is a simplified reverse sync - in practice, you'd want more
sophisticated logic
+ # to handle the bidirectional mapping properly
+
+ # For now, we'll just ensure that the basic structure is in place
+ # Full implementation would require more complex mapping logic
+ pass
+
+
+# Global instance for easy access
+shadow_writer = ShadowWriter()
Review Comment:
### Global State Usage <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
Using a global instance violates dependency injection principles and makes
the code tightly coupled and harder to test.
###### Why this matters
Global state makes unit testing difficult and creates hidden dependencies
that are hard to track and maintain.
###### Suggested change ∙ *Feature Preview*
Use dependency injection instead:
```python
class DatasetManager:
def __init__(self, shadow_writer: ShadowWriter):
self.shadow_writer = shadow_writer
# In application startup/configuration
shadow_writer = ShadowWriter()
dataset_manager = DatasetManager(shadow_writer)
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/91dbf4c6-514d-4463-a564-8495868351c1/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/91dbf4c6-514d-4463-a564-8495868351c1?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/91dbf4c6-514d-4463-a564-8495868351c1?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/91dbf4c6-514d-4463-a564-8495868351c1?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/91dbf4c6-514d-4463-a564-8495868351c1)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:15a95a17-4dea-4402-924e-1c11f6ff5ea8 -->
[](15a95a17-4dea-4402-924e-1c11f6ff5ea8)
##########
superset/migrations/versions/2025-08-25_14-21_add_sip68_dataset_models.py:
##########
@@ -0,0 +1,263 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add_sip68_dataset_models
+
+Revision ID: sip68_dataset_models
+Revises: c233f5365c9e
+Create Date: 2025-08-25 14:21:00.000000
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql, postgresql
+
+# revision identifiers, used by Alembic.
+revision = "sip68_dataset_models"
+down_revision = "c233f5365c9e"
+
+
+def upgrade():
+ """Create new dataset models for SIP-68."""
+
+ # Create the sip68_tables table
+ op.create_table(
+ "sip68_tables",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("uuid", postgresql.UUID(as_uuid=True), nullable=False),
+ sa.Column("created_on", sa.DateTime(), nullable=True),
+ sa.Column("changed_on", sa.DateTime(), nullable=True),
+ sa.Column("created_by_fk", sa.Integer(), nullable=True),
+ sa.Column("changed_by_fk", sa.Integer(), nullable=True),
+ sa.Column("database_id", sa.Integer(), nullable=False),
+ sa.Column("catalog", sa.String(length=256), nullable=True),
+ sa.Column("schema", sa.String(length=255), nullable=True),
+ sa.Column("name", sa.String(length=250), nullable=False),
+ sa.ForeignKeyConstraint(
+ ["changed_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["created_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["database_id"],
+ ["dbs.id"],
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ sa.UniqueConstraint("uuid"),
+ )
+
+ # Create the sip68_datasets table
+ op.create_table(
+ "sip68_datasets",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("uuid", postgresql.UUID(as_uuid=True), nullable=False),
+ sa.Column("created_on", sa.DateTime(), nullable=True),
+ sa.Column("changed_on", sa.DateTime(), nullable=True),
+ sa.Column("created_by_fk", sa.Integer(), nullable=True),
+ sa.Column("changed_by_fk", sa.Integer(), nullable=True),
+ sa.Column("name", sa.String(length=250), nullable=False),
+ sa.Column("kind", sa.Enum("PHYSICAL", "VIRTUAL", name="datasetkind"),
nullable=False),
+ sa.Column("expression", sa.Text(), nullable=False),
+ sa.Column("description", sa.Text(), nullable=True),
+ sa.Column("default_endpoint", sa.Text(), nullable=True),
+ sa.Column("is_featured", sa.Boolean(), nullable=True),
+ sa.Column("filter_select_enabled", sa.Boolean(), nullable=True),
+ sa.Column("offset", sa.Integer(), nullable=True),
+ sa.Column("cache_timeout", sa.Integer(), nullable=True),
+ sa.Column("params", sa.Text(), nullable=True),
+ sa.Column("extra", sa.Text(), nullable=True),
+ sa.Column("main_dttm_col", sa.String(length=250), nullable=True),
+ sa.Column("fetch_values_predicate", sa.Text(), nullable=True),
+ sa.Column("normalize_columns", sa.Boolean(), nullable=True),
+ sa.Column("always_filter_main_dttm", sa.Boolean(), nullable=True),
+ sa.Column("sql", sa.Text(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["changed_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["created_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ sa.UniqueConstraint("uuid"),
+ )
+
+ # Create the sip68_columns table
+ op.create_table(
+ "sip68_columns",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("uuid", postgresql.UUID(as_uuid=True), nullable=False),
+ sa.Column("created_on", sa.DateTime(), nullable=True),
+ sa.Column("changed_on", sa.DateTime(), nullable=True),
+ sa.Column("created_by_fk", sa.Integer(), nullable=True),
+ sa.Column("changed_by_fk", sa.Integer(), nullable=True),
+ sa.Column("name", sa.String(length=255), nullable=False),
+ sa.Column("type", sa.String(length=32), nullable=True),
+ sa.Column("expression", sa.Text(), nullable=False),
+ sa.Column("verbose_name", sa.String(length=1024), nullable=True),
+ sa.Column("description", sa.Text(), nullable=True),
+ sa.Column("warning_text", sa.Text(), nullable=True),
+ sa.Column("units", sa.String(length=128), nullable=True),
+ sa.Column("is_active", sa.Boolean(), nullable=True),
+ sa.Column("is_temporal", sa.Boolean(), nullable=True),
+ sa.Column("is_spatial", sa.Boolean(), nullable=True),
+ sa.Column("is_partition", sa.Boolean(), nullable=True),
+ sa.Column("is_aggregation", sa.Boolean(), nullable=True),
+ sa.Column("is_additive", sa.Boolean(), nullable=True),
+ sa.Column("groupby", sa.Boolean(), nullable=True),
+ sa.Column("filterable", sa.Boolean(), nullable=True),
+ sa.Column("cardinality", sa.Integer(), nullable=True),
+ sa.Column("increase_good", sa.Boolean(), nullable=True),
+ sa.Column("d3format", sa.String(length=128), nullable=True),
+ sa.Column("currency", sa.JSON(), nullable=True),
+ sa.Column("python_date_format", sa.String(length=255), nullable=True),
+ sa.Column("advanced_data_type", sa.String(length=255), nullable=True),
+ sa.Column("extra", sa.Text(), nullable=True),
+ sa.Column("table_id", sa.Integer(), nullable=True),
+ sa.Column("dataset_id", sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["changed_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["created_by_fk"],
+ ["ab_user.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["dataset_id"],
+ ["sip68_datasets.id"],
+ ondelete="CASCADE",
+ ),
+ sa.ForeignKeyConstraint(
+ ["table_id"],
+ ["sip68_tables.id"],
+ ondelete="CASCADE",
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ sa.UniqueConstraint("uuid"),
+ )
+
+ # Create the dataset-table association table
+ op.create_table(
+ "sip68_dataset_table_association",
+ sa.Column("dataset_id", sa.Integer(), nullable=True),
+ sa.Column("table_id", sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["dataset_id"],
+ ["sip68_datasets.id"],
+ ondelete="CASCADE",
+ ),
+ sa.ForeignKeyConstraint(
+ ["table_id"],
+ ["sip68_tables.id"],
+ ondelete="CASCADE",
+ ),
+ )
+
+ # Create the dataset-user association table (for owners)
+ op.create_table(
+ "sip68_dataset_user_association",
+ sa.Column("dataset_id", sa.Integer(), nullable=True),
+ sa.Column("user_id", sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["dataset_id"],
+ ["sip68_datasets.id"],
+ ondelete="CASCADE",
+ ),
+ sa.ForeignKeyConstraint(
+ ["user_id"],
+ ["ab_user.id"],
+ ondelete="CASCADE",
+ ),
+ )
+
+ # Create indexes for better performance
+ op.create_index(
+ "ix_sip68_tables_database_catalog_schema_name",
+ "sip68_tables",
+ ["database_id", "catalog", "schema", "name"],
+ unique=False,
+ )
+
+ op.create_index(
+ "ix_sip68_datasets_name_expression",
+ "sip68_datasets",
+ ["name", "expression"],
+ unique=False,
+ )
+
+ op.create_index(
+ "ix_sip68_columns_table_name",
+ "sip68_columns",
+ ["table_id", "name"],
+ unique=False,
+ )
+
+ op.create_index(
+ "ix_sip68_columns_dataset_name_aggregation",
+ "sip68_columns",
+ ["dataset_id", "name", "is_aggregation"],
+ unique=False,
+ )
+
+ # Set default values for certain columns
+ op.execute("UPDATE sip68_datasets SET is_featured = false WHERE
is_featured IS NULL")
+ op.execute("UPDATE sip68_datasets SET filter_select_enabled = true WHERE
filter_select_enabled IS NULL")
+ op.execute('UPDATE sip68_datasets SET "offset" = 0 WHERE "offset" IS NULL')
Review Comment:
### Non-atomic default value setting <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
Default values are set using UPDATE statements after table creation, which
could miss rows in concurrent operations.
###### Why this matters
Setting defaults via UPDATE statements after table creation is not atomic
and could lead to inconsistent data if new rows are inserted during migration.
###### Suggested change ∙ *Feature Preview*
Set default values during table creation:
```python
sa.Column("is_featured", sa.Boolean(), nullable=False,
server_default=sa.false()),
sa.Column("filter_select_enabled", sa.Boolean(), nullable=False,
server_default=sa.true()),
sa.Column("offset", sa.Integer(), nullable=False, server_default='0'),
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/ea44f865-a9a8-4e9f-b170-26a083b1403a/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/ea44f865-a9a8-4e9f-b170-26a083b1403a?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/ea44f865-a9a8-4e9f-b170-26a083b1403a?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/ea44f865-a9a8-4e9f-b170-26a083b1403a?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/ea44f865-a9a8-4e9f-b170-26a083b1403a)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:cf38a0ae-859f-4458-9ac5-289b994f545c -->
[](cf38a0ae-859f-4458-9ac5-289b994f545c)
##########
superset/datasets/shadow_writer.py:
##########
@@ -0,0 +1,433 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Shadow writing mechanism for dataset model migration (SIP-68).
+
+This module provides functionality to keep the old SqlaTable model
+and new Dataset/Table/Column models in sync during the transition period.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional
+
+from flask import current_app
+
+from superset import is_feature_enabled
+from sqlalchemy import and_
+from sqlalchemy.orm import Session
+
+from superset import db
+from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
+from superset.models.core import Database
+from superset.models.dataset import Column, Dataset, DatasetKind, Table
+
+logger = logging.getLogger(__name__)
+
+
+class ShadowWriter:
+ """
+ Handles synchronization between old and new dataset models.
+
+ This class implements shadow writing, ensuring that changes to either
+ the old SqlaTable model or the new Dataset/Table/Column models are
+ reflected in both representations.
+ """
+
+ def __init__(self, session: Optional[Session] = None) -> None:
+ self.session = session or db.session
+
+ def is_shadow_writing_enabled(self) -> bool:
+ """Check if shadow writing is enabled via feature flags."""
+ return is_feature_enabled("DATASET_SHADOW_WRITING_ENABLED")
+
+ def sync_sqla_table_to_new_models(self, sqla_table: SqlaTable) ->
tuple[Table, Dataset]:
+ """
+ Synchronize a SqlaTable to the new Table and Dataset models.
+
+ Args:
+ sqla_table: The SqlaTable instance to sync
+
+ Returns:
+ Tuple of (Table, Dataset) instances
+ """
+ if not self.is_shadow_writing_enabled():
+ return None, None
+
+ try:
+ # Use a separate session to avoid conflicts with ongoing
transactions
+ from sqlalchemy.orm import sessionmaker
+ Session = sessionmaker(bind=self.session.bind)
+ shadow_session = Session()
+
+ # Create a new shadow writer instance with the separate session
+ shadow_writer_instance = ShadowWriter(shadow_session)
+
+ # Find or create the corresponding Table
+ table = shadow_writer_instance._find_or_create_table(sqla_table)
+
+ # Find or create the corresponding Dataset
+ dataset =
shadow_writer_instance._find_or_create_dataset(sqla_table, table)
+
+ # Sync columns and metrics
+ shadow_writer_instance._sync_columns_and_metrics(sqla_table,
table, dataset)
+
+ # Commit the shadow session
+ shadow_session.commit()
+ shadow_session.close()
+
+ logger.debug(
+ "Synced SqlaTable %s to Table %s and Dataset %s",
+ sqla_table.id,
+ table.id if table else None,
+ dataset.id if dataset else None,
+ )
+
+ return table, dataset
+
+ except Exception as e:
+ logger.warning(
+ "Error syncing SqlaTable %s to new models: %s",
+ sqla_table.id,
+ str(e)
+ )
+ if 'shadow_session' in locals():
+ shadow_session.rollback()
+ shadow_session.close()
+ return None, None
+
+ def sync_new_models_to_sqla_table(self, dataset: Dataset) ->
Optional[SqlaTable]:
+ """
+ Synchronize a Dataset back to the SqlaTable model.
+
+ Args:
+ dataset: The Dataset instance to sync
+
+ Returns:
+ The corresponding SqlaTable instance
+ """
+ if not self.is_shadow_writing_enabled():
+ return None
+
+ # For now, we only support physical datasets in reverse sync
+ if dataset.is_virtual:
+ logger.warning("Reverse sync not supported for virtual datasets
yet")
+ return None
+
+ # Get the primary table for this dataset
+ if not dataset.tables:
+ logger.warning("Dataset %s has no associated tables", dataset.id)
+ return None
+
+ primary_table = dataset.tables[0] # For physical datasets, there
should be only one
+
+ # Find or create corresponding SqlaTable
+ sqla_table = self._find_or_create_sqla_table(dataset, primary_table)
+
+ # Sync properties
+ sqla_table.table_name = primary_table.name
+ sqla_table.schema = primary_table.schema
+ sqla_table.catalog = primary_table.catalog
+ sqla_table.database_id = primary_table.database_id
+ sqla_table.description = dataset.description
+ sqla_table.main_dttm_col = dataset.main_dttm_col
+ sqla_table.default_endpoint = dataset.default_endpoint
+ sqla_table.is_featured = dataset.is_featured
+ sqla_table.filter_select_enabled = dataset.filter_select_enabled
+ sqla_table.offset = dataset.offset
+ sqla_table.cache_timeout = dataset.cache_timeout
+ sqla_table.params = dataset.params
+ sqla_table.extra = dataset.extra
+ sqla_table.fetch_values_predicate = dataset.fetch_values_predicate
+
+ # Sync columns and metrics back
+ self._sync_new_columns_to_sqla_table(dataset, sqla_table)
+
+ logger.debug(
+ "Synced Dataset %s back to SqlaTable %s",
+ dataset.id,
+ sqla_table.id,
+ )
+
+ return sqla_table
+
+ def _find_or_create_table(self, sqla_table: SqlaTable) -> Table:
+ """Find or create a Table instance corresponding to a SqlaTable."""
+ # Look for existing Table
+ table = (
+ self.session.query(Table)
+ .filter(
+ and_(
+ Table.database_id == sqla_table.database_id,
+ Table.catalog == sqla_table.catalog,
+ Table.schema == sqla_table.schema,
+ Table.name == sqla_table.table_name,
+ )
+ )
+ .first()
+ )
+
+ if not table:
+ table = Table(
+ database_id=sqla_table.database_id,
+ catalog=sqla_table.catalog,
+ schema=sqla_table.schema,
+ name=sqla_table.table_name,
+ )
+ self.session.add(table)
+ self.session.flush() # Get the ID
+
+ return table
+
+ def _find_or_create_dataset(self, sqla_table: SqlaTable, table: Table) ->
Dataset:
+ """Find or create a Dataset instance corresponding to a SqlaTable."""
+ # For physical datasets, use the table name as dataset name
+ # For virtual datasets, use the existing table_name (which contains
the custom name)
+
+ kind = DatasetKind.VIRTUAL if sqla_table.sql else DatasetKind.PHYSICAL
+ name = sqla_table.table_name
+ expression = sqla_table.sql or sqla_table.table_name
+
+ # Look for existing Dataset
+ # We'll use a combination of name and expression to find existing
datasets
+ dataset = (
+ self.session.query(Dataset)
+ .filter(
+ and_(
+ Dataset.name == name,
+ Dataset.expression == expression,
+ )
+ )
+ .first()
+ )
+
+ if not dataset:
+ dataset = Dataset(
+ name=name,
+ kind=kind,
+ expression=expression,
+ description=sqla_table.description,
+ main_dttm_col=sqla_table.main_dttm_col,
+ default_endpoint=sqla_table.default_endpoint,
+ is_featured=sqla_table.is_featured,
+ filter_select_enabled=sqla_table.filter_select_enabled,
+ offset=sqla_table.offset,
+ cache_timeout=sqla_table.cache_timeout,
+ params=sqla_table.params,
+ extra=sqla_table.extra,
+ fetch_values_predicate=sqla_table.fetch_values_predicate,
+ sql=sqla_table.sql, # For backward compatibility
+ )
+ self.session.add(dataset)
+ self.session.flush() # Get the ID
+
+ # Associate with the table
+ if kind == DatasetKind.PHYSICAL:
+ dataset.tables.append(table)
+
+ return dataset
+
+ def _find_or_create_sqla_table(self, dataset: Dataset, table: Table) ->
SqlaTable:
+ """Find or create a SqlaTable instance corresponding to a Dataset."""
+ # Look for existing SqlaTable
+ sqla_table = (
+ self.session.query(SqlaTable)
+ .filter(
+ and_(
+ SqlaTable.database_id == table.database_id,
+ SqlaTable.catalog == table.catalog,
+ SqlaTable.schema == table.schema,
+ SqlaTable.table_name == table.name,
+ )
+ )
+ .first()
+ )
+
+ if not sqla_table:
+ sqla_table = SqlaTable(
+ table_name=table.name,
+ schema=table.schema,
+ catalog=table.catalog,
+ database_id=table.database_id,
+ )
+ self.session.add(sqla_table)
+ self.session.flush() # Get the ID
+
+ return sqla_table
+
+ def _sync_columns_and_metrics(
+ self, sqla_table: SqlaTable, table: Table, dataset: Dataset
+ ) -> None:
+ """Sync columns and metrics from SqlaTable to new models."""
+
+ # Sync table columns to Column model (table-level)
+ for table_column in sqla_table.columns:
+ if table_column.is_active: # Only sync active columns
+ self._sync_table_column(table_column, table)
+
+ # Sync columns to Column model (dataset-level) - these are
dataset-specific column configs
+ for table_column in sqla_table.columns:
+ if table_column.is_active:
+ self._sync_dataset_column(table_column, dataset)
+
+ # Sync metrics to Column model (dataset-level)
+ for metric in sqla_table.metrics:
+ self._sync_metric(metric, dataset)
+
+ def _sync_table_column(self, table_column: TableColumn, table: Table) ->
Column:
+ """Sync a TableColumn to a table-level Column."""
+ # Look for existing column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.table_id == table.id,
+ Column.name == table_column.column_name,
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ table_id=table.id,
+ name=table_column.column_name,
+ type=table_column.type,
+ expression=table_column.expression or table_column.column_name,
+ is_temporal=table_column.is_dttm,
+ python_date_format=table_column.python_date_format,
+ advanced_data_type=table_column.advanced_data_type,
+ extra=table_column.extra,
+ )
+ self.session.add(column)
+ else:
+ # Update existing column
+ column.type = table_column.type
+ column.expression = table_column.expression or
table_column.column_name
+ column.is_temporal = table_column.is_dttm
+ column.python_date_format = table_column.python_date_format
+ column.advanced_data_type = table_column.advanced_data_type
+ column.extra = table_column.extra
+
+ return column
+
+ def _sync_dataset_column(self, table_column: TableColumn, dataset:
Dataset) -> Column:
+ """Sync a TableColumn to a dataset-level Column."""
+ # Look for existing dataset column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.dataset_id == dataset.id,
+ Column.name == table_column.column_name,
+ Column.is_aggregation == False, # Not a metric
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ dataset_id=dataset.id,
+ name=table_column.column_name,
+ type=table_column.type,
+ expression=table_column.expression or table_column.column_name,
+ verbose_name=table_column.verbose_name,
+ description=table_column.description,
+ is_active=table_column.is_active,
+ is_temporal=table_column.is_dttm,
+ groupby=table_column.groupby,
+ filterable=table_column.filterable,
+ python_date_format=table_column.python_date_format,
+ advanced_data_type=table_column.advanced_data_type,
+ extra=table_column.extra,
+ is_aggregation=False,
+ )
+ self.session.add(column)
+ else:
+ # Update existing column
+ column.type = table_column.type
+ column.expression = table_column.expression or
table_column.column_name
+ column.verbose_name = table_column.verbose_name
+ column.description = table_column.description
+ column.is_active = table_column.is_active
+ column.is_temporal = table_column.is_dttm
+ column.groupby = table_column.groupby
+ column.filterable = table_column.filterable
+ column.python_date_format = table_column.python_date_format
+ column.advanced_data_type = table_column.advanced_data_type
+ column.extra = table_column.extra
+
+ return column
+
+ def _sync_metric(self, metric: SqlMetric, dataset: Dataset) -> Column:
+ """Sync a SqlMetric to a Column with is_aggregation=True."""
+ # Look for existing metric column
+ column = (
+ self.session.query(Column)
+ .filter(
+ and_(
+ Column.dataset_id == dataset.id,
+ Column.name == metric.metric_name,
+ Column.is_aggregation == True,
+ )
+ )
+ .first()
+ )
+
+ if not column:
+ column = Column(
+ dataset_id=dataset.id,
+ name=metric.metric_name,
+ type="metric",
+ expression=metric.expression,
+ verbose_name=metric.verbose_name,
+ description=metric.description,
+ warning_text=metric.warning_text,
+ d3format=metric.d3format,
+ currency=metric.currency,
+ extra=metric.extra,
+ is_aggregation=True,
+ is_active=True,
+ )
+ self.session.add(column)
+ else:
+ # Update existing metric column
+ column.expression = metric.expression
+ column.verbose_name = metric.verbose_name
+ column.description = metric.description
+ column.warning_text = metric.warning_text
+ column.d3format = metric.d3format
+ column.currency = metric.currency
+ column.extra = metric.extra
+
+ return column
+
+ def _sync_new_columns_to_sqla_table(self, dataset: Dataset, sqla_table:
SqlaTable) -> None:
+ """Sync columns from new models back to SqlaTable."""
+ # This is a simplified reverse sync - in practice, you'd want more
sophisticated logic
+ # to handle the bidirectional mapping properly
+
+ # For now, we'll just ensure that the basic structure is in place
+ # Full implementation would require more complex mapping logic
+ pass
Review Comment:
### Incomplete Bidirectional Sync Implementation <sub></sub>
<details>
<summary>Tell me more</summary>
###### What is the issue?
The _sync_new_columns_to_sqla_table method is incomplete and violates the
Open/Closed Principle by requiring modification when the full implementation is
needed.
###### Why this matters
This creates technical debt and could lead to inconsistencies in the
bidirectional synchronization. Future changes will require modifying existing
code rather than extending it.
###### Suggested change ∙ *Feature Preview*
Implement an abstract base class or interface for sync strategies:
```python
class SyncStrategy(ABC):
@abstractmethod
def sync_columns(self, source: Any, target: Any) -> None:
pass
class NewToSqlaSyncStrategy(SyncStrategy):
def sync_columns(self, dataset: Dataset, sqla_table: SqlaTable) -> None:
# Implement full sync logic here
pass
```
###### Provide feedback to improve future suggestions
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/7ef537a1-be1e-41fb-9027-d3cbd58f8d54/upvote)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/7ef537a1-be1e-41fb-9027-d3cbd58f8d54?what_not_true=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/7ef537a1-be1e-41fb-9027-d3cbd58f8d54?what_out_of_scope=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/7ef537a1-be1e-41fb-9027-d3cbd58f8d54?what_not_in_standard=true)
[](https://app.korbit.ai/feedback/aa91ff46-6083-4491-9416-b83dd1994b51/7ef537a1-be1e-41fb-9027-d3cbd58f8d54)
</details>
<sub>
💬 Looking for more details? Reply to this comment to chat with Korbit.
</sub>
<!--- korbi internal id:78ee43c2-98be-4f3a-813c-d03eabd0f568 -->
[](78ee43c2-98be-4f3a-813c-d03eabd0f568)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]