This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 21e3601e3b6 [v3-1-test] fix: use 
useAssetServiceGetDagAssetQueuedEvents to get the correct number of ADRQs 
(#62868) (#62902)
21e3601e3b6 is described below

commit 21e3601e3b6b948010075e7df39f86a08d511161
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 5 12:33:49 2026 +0800

    [v3-1-test] fix: use useAssetServiceGetDagAssetQueuedEvents to get the 
correct number of ADRQs (#62868) (#62902)
---
 .../ui/src/pages/DagsList/AssetSchedule.tsx        | 44 ++++++++++++++++++----
 1 file changed, 37 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index 44cf11054bb..fe1962e891a 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -22,7 +22,7 @@ import { useTranslation } from "react-i18next";
 import { FiDatabase } from "react-icons/fi";
 import { Link as RouterLink } from "react-router-dom";
 
-import { useAssetServiceNextRunAssets } from "openapi/queries";
+import { useAssetServiceGetDagAssetQueuedEvents, useAssetServiceNextRunAssets 
} from "openapi/queries";
 import { AssetExpression, type ExpressionType } from 
"src/components/AssetExpression";
 import type { NextRunEvent } from "src/components/AssetExpression/types";
 import { TruncatedText } from "src/components/TruncatedText";
@@ -36,18 +36,48 @@ type Props = {
 };
 
 export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter, 
timetableSummary }: Props) => {
-  const { t: translate } = useTranslation("dags");
-  const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
+  const { t: translate } = useTranslation(["dags", "common"]);
+
+  const { data: nextRun, isLoading: isNextRunLoading } = 
useAssetServiceNextRunAssets({ dagId });
+  const {
+    data: queuedEventsData,
+    error: queuedEventsError,
+    isLoading: isQueuedEventsLoading,
+  } = useAssetServiceGetDagAssetQueuedEvents({ dagId }, undefined);
 
   const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
+  const queuedEventsErrorStatus =
+    typeof queuedEventsError === "object" && queuedEventsError !== null && 
"status" in queuedEventsError
+      ? (queuedEventsError as { status?: number }).status
+      : undefined;
+  const hasQueuedEventsError = Boolean(queuedEventsError) && 
queuedEventsErrorStatus !== 404;
+  const queuedAssetEvents = new Map<number, string>();
+
+  if (!hasQueuedEventsError) {
+    for (const event of queuedEventsData?.queued_events ?? []) {
+      // Keep a single event timestamp per asset, using the latest one when 
duplicates exist.
+      const existingEventDate = queuedAssetEvents.get(event.asset_id);
 
-  const pendingEvents = nextRunEvents.filter((ev) => {
-    if (ev.lastUpdate !== null && latestRunAfter !== undefined) {
-      return dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+      if (existingEventDate === undefined || 
dayjs(event.created_at).isAfter(existingEventDate)) {
+        queuedAssetEvents.set(event.asset_id, event.created_at);
+      }
     }
+  }
+
+  const pendingEvents = nextRunEvents.flatMap((event) => {
+    if (hasQueuedEventsError) {
+      if (event.lastUpdate === null) {
+        return [];
+      }
+
+      return latestRunAfter !== undefined && 
dayjs(event.lastUpdate).isAfter(latestRunAfter) ? [event] : [];
+    }
+
+    const queuedAt = queuedAssetEvents.get(event.id);
 
-    return false;
+    return queuedAt === undefined ? [] : [{ ...event, lastUpdate: 
event.lastUpdate ?? queuedAt }];
   });
+  const isLoading = isNextRunLoading || isQueuedEventsLoading;
 
   if (!nextRunEvents.length) {
     return (

Reply via email to