This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 677f3cab93 [GLUTEN-10352][FLINK] Support nexmark test from q4 to q9 
(#10468)
677f3cab93 is described below

commit 677f3cab93e8e0cb6bb5f9fdcb978df99ea8b007
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Aug 21 12:14:35 2025 +0800

    [GLUTEN-10352][FLINK] Support nexmark test from q4 to q9 (#10468)
    
    * add nexmark test q4~q9
---
 .github/workflows/flink.yml                        |  2 +-
 gluten-flink/docs/Flink.md                         |  2 +-
 .../org/apache/gluten/rexnode/WindowUtils.java     |  3 ++
 gluten-flink/ut/src/test/resources/nexmark/q4.sql  | 18 ++++++++++
 gluten-flink/ut/src/test/resources/nexmark/q5.sql  | 38 ++++++++++++++++++++++
 gluten-flink/ut/src/test/resources/nexmark/q7.sql  | 21 ++++++++++++
 gluten-flink/ut/src/test/resources/nexmark/q8.sql  | 27 +++++++++++++++
 gluten-flink/ut/src/test/resources/nexmark/q9.sql  | 31 ++++++++++++++++++
 8 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index cf2389f9bd..ed579c0729 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
4b92595a72bf64453c2a59a21aa49a7b9898ef91
+          cd velox4j && git reset --hard 
a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 0fcf1bfacb..d0c985cc4c 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 4b92595a72bf64453c2a59a21aa49a7b9898ef91
+git reset --hard a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
 mvn clean install
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index c7bf470a14..60b6f71fa9 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -18,6 +18,7 @@ package org.apache.gluten.rexnode;
 
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
+import 
org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
 import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
 import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
 import 
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
@@ -66,6 +67,8 @@ public class WindowUtils {
     } else if (windowing instanceof WindowAttachedWindowingStrategy) {
       rowtimeIndex = ((WindowAttachedWindowingStrategy) 
windowing).getWindowEnd();
       windowType = 1;
+    } else if (windowing instanceof SliceAttachedWindowingStrategy) {
+      rowtimeIndex = ((SliceAttachedWindowingStrategy) 
windowing).getSliceEnd();
     }
     return new Tuple5<Long, Long, Long, Integer, Integer>(
         size, slide, offset, rowtimeIndex, windowType);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q4.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q4.sql
new file mode 100644
index 0000000000..0cfd76194a
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q4.sql
@@ -0,0 +1,18 @@
+CREATE TABLE nexmark_q4 (
+  id BIGINT,
+  final BIGINT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q4
+SELECT
+    Q.category,
+    AVG(Q.final)
+FROM (
+    SELECT MAX(B.price) AS final, A.category
+    FROM auction A, bid B
+    WHERE A.id = B.auction AND B.`dateTime` BETWEEN A.`dateTime` AND A.expires
+    GROUP BY A.id, A.category
+) Q
+GROUP BY Q.category;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q5.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q5.sql
new file mode 100644
index 0000000000..98954bacf2
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q5.sql
@@ -0,0 +1,38 @@
+CREATE TABLE nexmark_q5 (
+  auction  BIGINT,
+  num  BIGINT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q5
+SELECT AuctionBids.auction, AuctionBids.num
+ FROM (
+   SELECT
+     auction,
+     count(*) AS num,
+     window_start AS starttime,
+     window_end AS endtime
+     FROM TABLE(
+             HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, 
INTERVAL '10' SECOND))
+     GROUP BY auction, window_start, window_end
+ ) AS AuctionBids
+ JOIN (
+   SELECT
+     max(CountBids.num) AS maxn,
+     CountBids.starttime,
+     CountBids.endtime
+   FROM (
+     SELECT
+       count(*) AS num,
+       window_start AS starttime,
+       window_end AS endtime
+     FROM TABLE(
+                HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND, 
INTERVAL '10' SECOND))
+     GROUP BY auction, window_start, window_end
+     ) AS CountBids
+   GROUP BY CountBids.starttime, CountBids.endtime
+ ) AS MaxBids
+ ON AuctionBids.starttime = MaxBids.starttime AND
+    AuctionBids.endtime = MaxBids.endtime AND
+    AuctionBids.num >= MaxBids.maxn;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q7.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q7.sql
new file mode 100644
index 0000000000..1b0ec308e9
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q7.sql
@@ -0,0 +1,21 @@
+CREATE TABLE nexmark_q7 (
+  auction  BIGINT,
+  bidder  BIGINT,
+  price  BIGINT,
+  `dateTime`  TIMESTAMP(3),
+  extra  VARCHAR
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q7
+SELECT B.auction, B.price, B.bidder, B.`dateTime`, B.extra
+from bid B
+JOIN (
+  SELECT MAX(price) AS maxprice, window_end as `dateTime`
+  FROM TABLE(
+          TUMBLE(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+  GROUP BY window_start, window_end
+) B1
+ON B.price = B1.maxprice
+WHERE B.`dateTime` BETWEEN B1.`dateTime`  - INTERVAL '10' SECOND AND 
B1.`dateTime`;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q8.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q8.sql
new file mode 100644
index 0000000000..1b112f9fc7
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q8.sql
@@ -0,0 +1,27 @@
+CREATE TABLE nexmark_q8 (
+  id  BIGINT,
+  name  VARCHAR,
+  stime  TIMESTAMP(3)
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q8
+SELECT P.id, P.name, P.starttime
+FROM (
+  SELECT id, name,
+        window_start AS starttime,
+        window_end AS endtime
+  FROM TABLE(
+            TUMBLE(TABLE person, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+  GROUP BY id, name, window_start, window_end
+) P
+JOIN (
+  SELECT seller,
+        window_start AS starttime,
+        window_end AS endtime
+  FROM TABLE(
+        TUMBLE(TABLE auction, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+  GROUP BY seller, window_start, window_end
+) A
+ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q9.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q9.sql
new file mode 100644
index 0000000000..60f2536084
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q9.sql
@@ -0,0 +1,31 @@
+CREATE TABLE nexmark_q9 (
+  id  BIGINT,
+  itemName  VARCHAR,
+  description  VARCHAR,
+  initialBid  BIGINT,
+  reserve  BIGINT,
+  `dateTime`  TIMESTAMP(3),
+  expires  TIMESTAMP(3),
+  seller  BIGINT,
+  category  BIGINT,
+  extra  VARCHAR,
+  auction  BIGINT,
+  bidder  BIGINT,
+  price  BIGINT,
+  bid_dateTime  TIMESTAMP(3),
+  bid_extra  VARCHAR
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q9
+SELECT
+    id, itemName, description, initialBid, reserve, `dateTime`, expires, 
seller, category, extra,
+    auction, bidder, price, bid_dateTime, bid_extra
+FROM (
+   SELECT A.*, B.auction, B.bidder, B.price, B.`dateTime` AS bid_dateTime, 
B.extra AS bid_extra,
+     ROW_NUMBER() OVER (PARTITION BY A.id ORDER BY B.price DESC, B.`dateTime` 
ASC) AS rownum
+   FROM auction A, bid B
+   WHERE A.id = B.auction AND B.`dateTime` BETWEEN A.`dateTime` AND A.expires
+)
+WHERE rownum <= 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to