michael-s-molina commented on code in PR #29860:
URL: https://github.com/apache/superset/pull/29860#discussion_r1705875765


##########
superset/migrations/shared/catalogs.py:
##########
@@ -112,6 +124,142 @@ def get_known_schemas(database_name: str, session: 
Session) -> list[str]:
     return sorted({name[0][1:-1].split("].[")[-1] for name in names})
 
 
+def print_processed_batch(
+    start_time: datetime, offset: int, total_rows: int, model: ModelType
+) -> None:
+    """
+    Print the progress of batch processing.
+
+    This function logs the progress of processing a batch of rows from a model.
+    It calculates the elapsed time since the start of the batch processing and
+    logs the number of rows processed along with the percentage completion.
+
+    Parameters:
+        start_time (datetime): The start time of the batch processing.
+        offset (int): The current offset in the batch processing.
+        total_rows (int): The total number of rows to process.
+        model (ModelType): The model being processed.
+    """
+    elapsed_time = datetime.now() - start_time
+    elapsed_seconds = elapsed_time.total_seconds()
+    elapsed_formatted = f"{int(elapsed_seconds // 
3600):02}:{int((elapsed_seconds % 3600) // 60):02}:{int(elapsed_seconds % 
60):02}"
+    rows_processed = min(offset + BATCH_SIZE, total_rows)
+    logger.info(
+        f"{elapsed_formatted} - {rows_processed:,} of {total_rows:,} 
{model.__tablename__} rows processed "
+        f"({(rows_processed / total_rows) * 100:.2f}%)"
+    )
+
+
+def update_catalog_column(
+    session: Session, database: Database, catalog: str, downgrade: bool = False
+) -> None:
+    """
+    Update the `catalog` column in the specified models to the given catalog.
+
+    This function iterates over a list of models defined by MODELS and updates
+    the `catalog` columnto the specified catalog or None depending on the 
downgrade
+    parameter. The update is performedin batches to optimize performance and 
reduce
+    memory usage.
+
+    Parameters:
+        session (Session): The SQLAlchemy session to use for database 
operations.
+        database (Database): The database instance containing the models to 
update.
+        catalog (Catalog): The new catalog value to set in the `catalog` 
column or
+            the default catalog if `downgrade` is True.
+        downgrade (bool): If True, the `catalog` column is set to None where 
the
+            catalog matches the specified catalog.
+    """
+    start_time = datetime.now()
+
+    logger.info(f"Updating {database.database_name} models to catalog 
{catalog}")
+
+    for model, column in MODELS:
+        # Get the total number of rows that match the condition
+        total_rows = (
+            session.query(sa.func.count(model.id))
+            .filter(getattr(model, column) == database.id)
+            .filter(model.catalog == catalog if downgrade else True)
+            .scalar()
+        )
+
+        logger.info(
+            f"Total rows to be processed for {model.__tablename__}: 
{total_rows:,}"
+        )
+
+        # Update in batches using row numbers
+        for i in range(0, total_rows, BATCH_SIZE):
+            subquery = (
+                session.query(model.id)
+                .filter(getattr(model, column) == database.id)
+                .filter(model.catalog == catalog if downgrade else True)
+                .order_by(model.id)
+                .offset(i)
+                .limit(BATCH_SIZE)
+                .subquery()
+            )
+            session.execute(
+                sa.update(model)
+                .where(model.id == subquery.c.id)
+                .values(catalog=None if downgrade else catalog)
+                .execution_options(synchronize_session=False)
+            )
+            # Commit the transaction after each batch
+            session.commit()

Review Comment:
   @eschutho Answering your question more generically, there are some possible 
strategies to revert batch migrations such as logging changes to be reverted, 
making migrations reversible by storing the original value in a dedicated 
column (we do that for chart migrations) or using database checkpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to