This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 36e02fab51f fix: use useAssetServiceGetDagAssetQueuedEvents to get the
correct number of ADRQs (#62868)
36e02fab51f is described below
commit 36e02fab51f9ff7227da4284770a8e396007d1dd
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 5 03:58:16 2026 +0800
fix: use useAssetServiceGetDagAssetQueuedEvents to get the correct number
of ADRQs (#62868)
---
.../ui/src/pages/DagsList/AssetSchedule.tsx | 46 ++++++++++++++++++----
1 file changed, 38 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index dd7a7a45d3b..d198a0c7f49 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -23,7 +23,7 @@ import { useTranslation } from "react-i18next";
import { FiDatabase } from "react-icons/fi";
import { Link as RouterLink } from "react-router-dom";
-import { useAssetServiceNextRunAssets } from "openapi/queries";
+import { useAssetServiceGetDagAssetQueuedEvents, useAssetServiceNextRunAssets
} from "openapi/queries";
import { AssetExpression, type ExpressionType } from
"src/components/AssetExpression";
import type { NextRunEvent } from "src/components/AssetExpression/types";
import { TruncatedText } from "src/components/TruncatedText";
@@ -61,22 +61,52 @@ const PartitionSchedule = ({ dagId, isLoading, pendingCount
}: PartitionSchedule
export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter,
timetableSummary }: Props) => {
const { t: translate } = useTranslation(["dags", "common"]);
- const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
const isPartitioned = timetableSummary === "Partitioned Asset";
+ const { data: nextRun, isLoading: isNextRunLoading } =
useAssetServiceNextRunAssets({ dagId });
+ const {
+ data: queuedEventsData,
+ error: queuedEventsError,
+ isLoading: isQueuedEventsLoading,
+ } = useAssetServiceGetDagAssetQueuedEvents({ dagId }, undefined, { enabled:
!isPartitioned });
const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
-
- const pendingEvents = nextRunEvents.filter((ev) => {
- if (ev.lastUpdate === null) {
- return false;
+ const queuedEventsErrorStatus =
+ typeof queuedEventsError === "object" && queuedEventsError !== null &&
"status" in queuedEventsError
+ ? (queuedEventsError as { status?: number }).status
+ : undefined;
+ const hasQueuedEventsError = Boolean(queuedEventsError) &&
queuedEventsErrorStatus !== 404;
+ const queuedAssetEvents = new Map<number, string>();
+
+ if (!isPartitioned && !hasQueuedEventsError) {
+ for (const event of queuedEventsData?.queued_events ?? []) {
+ // Keep a single event timestamp per asset, using the latest one when
duplicates exist.
+ const existingEventDate = queuedAssetEvents.get(event.asset_id);
+
+ if (existingEventDate === undefined ||
dayjs(event.created_at).isAfter(existingEventDate)) {
+ queuedAssetEvents.set(event.asset_id, event.created_at);
+ }
}
+ }
+
+ const pendingEvents = nextRunEvents.flatMap((event) => {
if (isPartitioned) {
- return true;
+ return event.lastUpdate === null ? [] : [event];
}
- return latestRunAfter !== undefined &&
dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+ if (hasQueuedEventsError) {
+ if (event.lastUpdate === null) {
+ return [];
+ }
+
+ return latestRunAfter !== undefined &&
dayjs(event.lastUpdate).isAfter(latestRunAfter) ? [event] : [];
+ }
+
+ const queuedAt = queuedAssetEvents.get(event.id);
+
+ return queuedAt === undefined ? [] : [{ ...event, lastUpdate:
event.lastUpdate ?? queuedAt }];
});
+ const isLoading = isNextRunLoading || (!isPartitioned &&
isQueuedEventsLoading);
if (!nextRunEvents.length) {
return (