This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 36e02fab51f fix: use useAssetServiceGetDagAssetQueuedEvents to get the 
correct number of ADRQs (#62868)
36e02fab51f is described below

commit 36e02fab51f9ff7227da4284770a8e396007d1dd
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 5 03:58:16 2026 +0800

    fix: use useAssetServiceGetDagAssetQueuedEvents to get the correct number 
of ADRQs (#62868)
---
 .../ui/src/pages/DagsList/AssetSchedule.tsx        | 46 ++++++++++++++++++----
 1 file changed, 38 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index dd7a7a45d3b..d198a0c7f49 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -23,7 +23,7 @@ import { useTranslation } from "react-i18next";
 import { FiDatabase } from "react-icons/fi";
 import { Link as RouterLink } from "react-router-dom";
 
-import { useAssetServiceNextRunAssets } from "openapi/queries";
+import { useAssetServiceGetDagAssetQueuedEvents, useAssetServiceNextRunAssets 
} from "openapi/queries";
 import { AssetExpression, type ExpressionType } from 
"src/components/AssetExpression";
 import type { NextRunEvent } from "src/components/AssetExpression/types";
 import { TruncatedText } from "src/components/TruncatedText";
@@ -61,22 +61,52 @@ const PartitionSchedule = ({ dagId, isLoading, pendingCount 
}: PartitionSchedule
 
 export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter, 
timetableSummary }: Props) => {
   const { t: translate } = useTranslation(["dags", "common"]);
-  const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
 
   const isPartitioned = timetableSummary === "Partitioned Asset";
+  const { data: nextRun, isLoading: isNextRunLoading } = 
useAssetServiceNextRunAssets({ dagId });
+  const {
+    data: queuedEventsData,
+    error: queuedEventsError,
+    isLoading: isQueuedEventsLoading,
+  } = useAssetServiceGetDagAssetQueuedEvents({ dagId }, undefined, { enabled: 
!isPartitioned });
 
   const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
-
-  const pendingEvents = nextRunEvents.filter((ev) => {
-    if (ev.lastUpdate === null) {
-      return false;
+  const queuedEventsErrorStatus =
+    typeof queuedEventsError === "object" && queuedEventsError !== null && 
"status" in queuedEventsError
+      ? (queuedEventsError as { status?: number }).status
+      : undefined;
+  const hasQueuedEventsError = Boolean(queuedEventsError) && 
queuedEventsErrorStatus !== 404;
+  const queuedAssetEvents = new Map<number, string>();
+
+  if (!isPartitioned && !hasQueuedEventsError) {
+    for (const event of queuedEventsData?.queued_events ?? []) {
+      // Keep a single event timestamp per asset, using the latest one when 
duplicates exist.
+      const existingEventDate = queuedAssetEvents.get(event.asset_id);
+
+      if (existingEventDate === undefined || 
dayjs(event.created_at).isAfter(existingEventDate)) {
+        queuedAssetEvents.set(event.asset_id, event.created_at);
+      }
     }
+  }
+
+  const pendingEvents = nextRunEvents.flatMap((event) => {
     if (isPartitioned) {
-      return true;
+      return event.lastUpdate === null ? [] : [event];
     }
 
-    return latestRunAfter !== undefined && 
dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+    if (hasQueuedEventsError) {
+      if (event.lastUpdate === null) {
+        return [];
+      }
+
+      return latestRunAfter !== undefined && 
dayjs(event.lastUpdate).isAfter(latestRunAfter) ? [event] : [];
+    }
+
+    const queuedAt = queuedAssetEvents.get(event.id);
+
+    return queuedAt === undefined ? [] : [{ ...event, lastUpdate: 
event.lastUpdate ?? queuedAt }];
   });
+  const isLoading = isNextRunLoading || (!isPartitioned && 
isQueuedEventsLoading);
 
   if (!nextRunEvents.length) {
     return (

Reply via email to