kaxil commented on a change in pull request #19425:
URL: https://github.com/apache/airflow/pull/19425#discussion_r743700766



##########
File path: airflow/utils/db.py
##########
@@ -709,18 +709,63 @@ def _format_dangling_error(source_table, target_table, 
invalid_count, reason):
     )
 
 
-def _move_dangling_run_data_to_new_table(session, source_table, target_table):
+def _move_dangling_run_data_to_new_table(session, source_table: "Table", 
target_table_name: str):
     where_clause = "where dag_id is null or run_id is null or execution_date 
is null"
-    session.execute(text(f"create table {target_table} as select * from 
{source_table} {where_clause}"))
-    session.execute(text(f"delete from {source_table} {where_clause}"))
+    _move_dangling_table(session, source_table, target_table_name, 
where_clause)
+
+
+def _move_dangling_table(session, source_table: "Table", target_table_name: 
str, where_clause: str):
+    dialect_name = session.get_bind().dialect.name
+
+    delete_where = " AND ".join(
+        f"{source_table.name}.{c.name} = d.{c.name}" for c in 
source_table.primary_key.columns
+    )
+    if dialect_name == "mssql":
+        session.execute(
+            text(f"select source.* into {target_table_name} from 
{source_table} as source {where_clause}")
+        )
+        session.execute(
+            text(
+                f"delete from {source_table} from {source_table} join 
{target_table_name} AS d ON "
+                + delete_where
+            )
+        )
+    else:
+        # Postgres, MySQL and SQLite all have the same CREATE TABLE a AS 
SELECT ... syntax
+        session.execute(
+            text(
+                f"create table {target_table_name} as select source.* from 
{source_table} as source "
+                + where_clause
+            )
+        )
+
+        # But different join-delete syntax.
+        if dialect_name == "mysql":
+            session.execute(
+                text(
+                    f"delete {source_table} from {source_table} join 
{target_table_name} as d on "
+                    + delete_where

Review comment:
       ```suggestion
                       + {delete_where}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to