This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 677f3cab93 [GLUTEN-10352][FLINK] Support nexmark test from q4 to q9
(#10468)
677f3cab93 is described below
commit 677f3cab93e8e0cb6bb5f9fdcb978df99ea8b007
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Aug 21 12:14:35 2025 +0800
[GLUTEN-10352][FLINK] Support nexmark test from q4 to q9 (#10468)
* add nexmark test q4~q9
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../org/apache/gluten/rexnode/WindowUtils.java | 3 ++
gluten-flink/ut/src/test/resources/nexmark/q4.sql | 18 ++++++++++
gluten-flink/ut/src/test/resources/nexmark/q5.sql | 38 ++++++++++++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q7.sql | 21 ++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q8.sql | 27 +++++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q9.sql | 31 ++++++++++++++++++
8 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index cf2389f9bd..ed579c0729 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
4b92595a72bf64453c2a59a21aa49a7b9898ef91
+ cd velox4j && git reset --hard
a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 0fcf1bfacb..d0c985cc4c 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 4b92595a72bf64453c2a59a21aa49a7b9898ef91
+git reset --hard a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
mvn clean install
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index c7bf470a14..60b6f71fa9 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -18,6 +18,7 @@ package org.apache.gluten.rexnode;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
+import
org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
import
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
@@ -66,6 +67,8 @@ public class WindowUtils {
} else if (windowing instanceof WindowAttachedWindowingStrategy) {
rowtimeIndex = ((WindowAttachedWindowingStrategy)
windowing).getWindowEnd();
windowType = 1;
+ } else if (windowing instanceof SliceAttachedWindowingStrategy) {
+ rowtimeIndex = ((SliceAttachedWindowingStrategy)
windowing).getSliceEnd();
}
return new Tuple5<Long, Long, Long, Integer, Integer>(
size, slide, offset, rowtimeIndex, windowType);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q4.sql
b/gluten-flink/ut/src/test/resources/nexmark/q4.sql
new file mode 100644
index 0000000000..0cfd76194a
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q4.sql
@@ -0,0 +1,18 @@
+CREATE TABLE nexmark_q4 (
+ id BIGINT,
+ final BIGINT
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q4
+SELECT
+ Q.category,
+ AVG(Q.final)
+FROM (
+ SELECT MAX(B.price) AS final, A.category
+ FROM auction A, bid B
+ WHERE A.id = B.auction AND B.`dateTime` BETWEEN A.`dateTime` AND A.expires
+ GROUP BY A.id, A.category
+) Q
+GROUP BY Q.category;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q5.sql
b/gluten-flink/ut/src/test/resources/nexmark/q5.sql
new file mode 100644
index 0000000000..98954bacf2
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q5.sql
@@ -0,0 +1,38 @@
+CREATE TABLE nexmark_q5 (
+ auction BIGINT,
+ num BIGINT
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q5
+SELECT AuctionBids.auction, AuctionBids.num
+ FROM (
+ SELECT
+ auction,
+ count(*) AS num,
+ window_start AS starttime,
+ window_end AS endtime
+ FROM TABLE(
+ HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND,
INTERVAL '10' SECOND))
+ GROUP BY auction, window_start, window_end
+ ) AS AuctionBids
+ JOIN (
+ SELECT
+ max(CountBids.num) AS maxn,
+ CountBids.starttime,
+ CountBids.endtime
+ FROM (
+ SELECT
+ count(*) AS num,
+ window_start AS starttime,
+ window_end AS endtime
+ FROM TABLE(
+ HOP(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '2' SECOND,
INTERVAL '10' SECOND))
+ GROUP BY auction, window_start, window_end
+ ) AS CountBids
+ GROUP BY CountBids.starttime, CountBids.endtime
+ ) AS MaxBids
+ ON AuctionBids.starttime = MaxBids.starttime AND
+ AuctionBids.endtime = MaxBids.endtime AND
+ AuctionBids.num >= MaxBids.maxn;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q7.sql
b/gluten-flink/ut/src/test/resources/nexmark/q7.sql
new file mode 100644
index 0000000000..1b0ec308e9
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q7.sql
@@ -0,0 +1,21 @@
+CREATE TABLE nexmark_q7 (
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q7
+SELECT B.auction, B.price, B.bidder, B.`dateTime`, B.extra
+from bid B
+JOIN (
+ SELECT MAX(price) AS maxprice, window_end as `dateTime`
+ FROM TABLE(
+ TUMBLE(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+ GROUP BY window_start, window_end
+) B1
+ON B.price = B1.maxprice
+WHERE B.`dateTime` BETWEEN B1.`dateTime` - INTERVAL '10' SECOND AND
B1.`dateTime`;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q8.sql
b/gluten-flink/ut/src/test/resources/nexmark/q8.sql
new file mode 100644
index 0000000000..1b112f9fc7
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q8.sql
@@ -0,0 +1,27 @@
+CREATE TABLE nexmark_q8 (
+ id BIGINT,
+ name VARCHAR,
+ stime TIMESTAMP(3)
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q8
+SELECT P.id, P.name, P.starttime
+FROM (
+ SELECT id, name,
+ window_start AS starttime,
+ window_end AS endtime
+ FROM TABLE(
+ TUMBLE(TABLE person, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+ GROUP BY id, name, window_start, window_end
+) P
+JOIN (
+ SELECT seller,
+ window_start AS starttime,
+ window_end AS endtime
+ FROM TABLE(
+ TUMBLE(TABLE auction, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND))
+ GROUP BY seller, window_start, window_end
+) A
+ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q9.sql
b/gluten-flink/ut/src/test/resources/nexmark/q9.sql
new file mode 100644
index 0000000000..60f2536084
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q9.sql
@@ -0,0 +1,31 @@
+CREATE TABLE nexmark_q9 (
+ id BIGINT,
+ itemName VARCHAR,
+ description VARCHAR,
+ initialBid BIGINT,
+ reserve BIGINT,
+ `dateTime` TIMESTAMP(3),
+ expires TIMESTAMP(3),
+ seller BIGINT,
+ category BIGINT,
+ extra VARCHAR,
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ bid_dateTime TIMESTAMP(3),
+ bid_extra VARCHAR
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q9
+SELECT
+ id, itemName, description, initialBid, reserve, `dateTime`, expires,
seller, category, extra,
+ auction, bidder, price, bid_dateTime, bid_extra
+FROM (
+ SELECT A.*, B.auction, B.bidder, B.price, B.`dateTime` AS bid_dateTime,
B.extra AS bid_extra,
+ ROW_NUMBER() OVER (PARTITION BY A.id ORDER BY B.price DESC, B.`dateTime`
ASC) AS rownum
+ FROM auction A, bid B
+ WHERE A.id = B.auction AND B.`dateTime` BETWEEN A.`dateTime` AND A.expires
+)
+WHERE rownum <= 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]