This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 21e3601e3b6 [v3-1-test] fix: use
useAssetServiceGetDagAssetQueuedEvents to get the correct number of ADRQs
(#62868) (#62902)
21e3601e3b6 is described below
commit 21e3601e3b6b948010075e7df39f86a08d511161
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 5 12:33:49 2026 +0800
[v3-1-test] fix: use useAssetServiceGetDagAssetQueuedEvents to get the
correct number of ADRQs (#62868) (#62902)
---
.../ui/src/pages/DagsList/AssetSchedule.tsx | 44 ++++++++++++++++++----
1 file changed, 37 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index 44cf11054bb..fe1962e891a 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -22,7 +22,7 @@ import { useTranslation } from "react-i18next";
import { FiDatabase } from "react-icons/fi";
import { Link as RouterLink } from "react-router-dom";
-import { useAssetServiceNextRunAssets } from "openapi/queries";
+import { useAssetServiceGetDagAssetQueuedEvents, useAssetServiceNextRunAssets
} from "openapi/queries";
import { AssetExpression, type ExpressionType } from
"src/components/AssetExpression";
import type { NextRunEvent } from "src/components/AssetExpression/types";
import { TruncatedText } from "src/components/TruncatedText";
@@ -36,18 +36,48 @@ type Props = {
};
export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter,
timetableSummary }: Props) => {
- const { t: translate } = useTranslation("dags");
- const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
+ const { t: translate } = useTranslation(["dags", "common"]);
+
+ const { data: nextRun, isLoading: isNextRunLoading } =
useAssetServiceNextRunAssets({ dagId });
+ const {
+ data: queuedEventsData,
+ error: queuedEventsError,
+ isLoading: isQueuedEventsLoading,
+ } = useAssetServiceGetDagAssetQueuedEvents({ dagId }, undefined);
const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
+ const queuedEventsErrorStatus =
+ typeof queuedEventsError === "object" && queuedEventsError !== null &&
"status" in queuedEventsError
+ ? (queuedEventsError as { status?: number }).status
+ : undefined;
+ const hasQueuedEventsError = Boolean(queuedEventsError) &&
queuedEventsErrorStatus !== 404;
+ const queuedAssetEvents = new Map<number, string>();
+
+ if (!hasQueuedEventsError) {
+ for (const event of queuedEventsData?.queued_events ?? []) {
+ // Keep a single event timestamp per asset, using the latest one when
duplicates exist.
+ const existingEventDate = queuedAssetEvents.get(event.asset_id);
- const pendingEvents = nextRunEvents.filter((ev) => {
- if (ev.lastUpdate !== null && latestRunAfter !== undefined) {
- return dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+ if (existingEventDate === undefined ||
dayjs(event.created_at).isAfter(existingEventDate)) {
+ queuedAssetEvents.set(event.asset_id, event.created_at);
+ }
}
+ }
+
+ const pendingEvents = nextRunEvents.flatMap((event) => {
+ if (hasQueuedEventsError) {
+ if (event.lastUpdate === null) {
+ return [];
+ }
+
+ return latestRunAfter !== undefined &&
dayjs(event.lastUpdate).isAfter(latestRunAfter) ? [event] : [];
+ }
+
+ const queuedAt = queuedAssetEvents.get(event.id);
- return false;
+ return queuedAt === undefined ? [] : [{ ...event, lastUpdate:
event.lastUpdate ?? queuedAt }];
});
+ const isLoading = isNextRunLoading || isQueuedEventsLoading;
if (!nextRunEvents.length) {
return (