amoghrajesh commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2946471431


##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
             op.execute("DELETE FROM deadline")
             return
 
-        deadline_table = table(
+        conn = op.get_bind()
+        dialect = conn.dialect.name
+
+        if dialect == "postgresql":
+            # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid 
Python
+            # deserialization. The callback JSON is serde-wrapped:
+            #   {"__data__": {"path": "...", "kwargs": {...}}, 
"__classname__": "...", ...}
+            # We extract __data__ fields and merge in prefix + dag_id.
+            # A writable CTE handles both the INSERT into callback and the 
UPDATE of
+            # deadline in a single statement, so the generated UUID is shared.
+            conn.execute(
+                sa.text("""
+                    WITH new_callbacks AS (
+                        SELECT
+                            d.id AS deadline_id,
+                            gen_random_uuid() AS callback_id,
+                            jsonb_build_object(
+                                'path', d.callback->'__data__'->>'path',
+                                'kwargs', d.callback->'__data__'->'kwargs',
+                                'prefix', :prefix,
+                                'dag_id', dr.dag_id
+                            )::json AS callback_data,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN d.callback_state
+                                ELSE :pending
+                            END AS cb_state,
+                            CASE
+                                WHEN d.callback_state IN ('failed', 'success') 
THEN true
+                                ELSE false
+                            END AS is_missed
+                        FROM deadline d
+                        JOIN dag_run dr ON d.dagrun_id = dr.id
+                        WHERE d.callback_id IS NULL
+                    ),
+                    inserted AS (
+                        INSERT INTO callback (id, type, fetch_method, data, 
state, priority_weight, created_at)
+                        SELECT
+                            callback_id, :cb_type, :fetch_method, 
callback_data, cb_state, 1, NOW()
+                        FROM new_callbacks
+                    )
+                    UPDATE deadline
+                    SET callback_id = nc.callback_id, missed = nc.is_missed
+                    FROM new_callbacks nc
+                    WHERE deadline.id = nc.deadline_id
+                """),
+                {
+                    "cb_type": _CALLBACK_TYPE_TRIGGERER,
+                    "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+                    "prefix": _CALLBACK_METRICS_PREFIX,
+                    "pending": _CALLBACK_STATE_PENDING,
+                },
+            )

Review Comment:
   Wondering if we can also add a batch and perform the work here? With a 
`LIMIT` too.
   
   This CTE might bloat the memory a lot for lot of data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to