gustavodemorais commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1961296377


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -312,71 +306,509 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
       (2, Array((13, "41.6"), (14, "45.2136"))),
       (3, Array((18, "42.6")))
     )
-    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
-    tEnv.createTemporaryView("T", t)
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[(Int, String)])],
+      sqlQuery = "SELECT a, b, A._1, A._2 FROM T, UNNEST(T.b) AS A where A._1 
> 13",
+      expectedResults = List("2,[13,41.6, 14,45.2136],14,45.2136", 
"3,[18,42.6],18,42.6"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'a,
+      'b
+    )
+  }
 
-    val sqlQuery = "SELECT a, b, A._1, A._2 FROM T, UNNEST(T.b) AS A where 
A._1 > 13"
-    val result = tEnv.sqlQuery(sqlQuery).toDataStream
-    val sink = new TestingAppendSink
-    result.addSink(sink)
-    env.execute()
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilter(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[(Int, String)])],
+      sqlQuery = """
+                   |SELECT * FROM (
+                   |   SELECT a, b1, b2 FROM
+                   |       (SELECT a, b FROM T) T2
+                   |       CROSS JOIN
+                   |       UNNEST(T2.b) as S(b1, b2)
+                   |       WHERE S.b1 >= 12
+                   |   ) tmp
+                   |WHERE b2 <> '42.6'
+                   |""".stripMargin,
+      expectedResults = List("1,12,45.612", "1,12,45.6", "2,13,41.6", 
"2,14,45.2136"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'a,
+      'b
+    )
+  }
 
-    val expected = List("2,[13,41.6, 14,45.2136],14,45.2136", 
"3,[18,42.6],18,42.6")
-    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  @TestTemplate
+  def testUnnestWithValuesStream(): Unit = {
+    assertUnnest(
+      testData = List(1),
+      typeInfo = createTypeInformation[Int],
+      sqlQuery = "SELECT * FROM UNNEST(ARRAY[1,2,3])",
+      expectedResults = List("1", "2", "3"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'dummy
+    )
+  }
+
+  @TestTemplate
+  def testUnnestWithValuesStream2(): Unit = {
+    assertUnnest(
+      testData = List(1),
+      typeInfo = createTypeInformation[Int],
+      sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3])",
+      expectedResults = List("a,1", "a,2", "a,3"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'dummy
+    )
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    assertUnnest(
+      testData = List(1),
+      typeInfo = createTypeInformation[Int],
+      sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY",
+      expectedResults = List("a,1,1", "a,2,2", "a,3,3"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'dummy
+    )
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[Int])],
+      sqlQuery = """
+                   |SELECT a, number, ordinality 
+                   |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                   |""".stripMargin,
+      expectedResults = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'a,
+      'b
+    )
   }
 
   @TestTemplate
-  def testUnnestWithNestedFilter(): Unit = {
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Long, Array[String])],
+      sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A (s, 
o)",
+      expectedResults = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'a,
+      'b,
+      'c
+    )
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[Array[Int]])],
+      sqlQuery = """
+                   |SELECT id, array_val, array_pos, `element`, element_pos
+                   |FROM T
+                   |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS 
A(array_val, array_pos)
+                   |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS 
B(`element`, element_pos)
+                   |""".stripMargin,
+      expectedResults = List(
+        "1,[1, 2],1,1,1",
+        "1,[1, 2],1,2,2",
+        "1,[3],2,3,1",
+        "2,[4, 5],1,4,1",
+        "2,[4, 5],1,5,2",
+        "2,[6, 7],2,6,1",
+        "2,[6, 7],2,7,2",
+        "3,[8],1,8,1"),
+      isRetract = false,
+      parallelism = -1,
+      fieldNames = 'id,
+      'nested_array
+    )
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Int, String)],
+      sqlQuery = """
+                   |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY 
a)
+                   |SELECT a, word, pos
+                   |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS 
A(word, pos)
+                   |""".stripMargin,
+      expectedResults = List(
+        "1,Hi,1",
+        "1,Hello,2",
+        "2,World,1",
+        "3,Hello world,1"
+      ),
+      isRetract = true,
+      parallelism = -1,
+      fieldNames = 'a,
+      'b,
+      'c
+    )
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+
+    assertUnnest(
+      testData = data,
+      typeInfo = typeInfo,
+      sqlQuery = """
+                   |SELECT id, k, v, pos
+                   |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, 
v, pos)
+                   |""".stripMargin,
+      expectedResults = List("1,a,10,1", "1,b,11,2", "2,c,20,2", "2,d,21,1"),
+      isRetract = false,
+      parallelism = 1,

Review Comment:
   Hmm, two old unnest tests were using it and that's why I had kept it 
https://github.com/apache/flink/pull/26113/files#diff-b15987b33460c38f1db98fee8ac3b8a4cf7f044918fd46f4ba2908e9386effd2L162
   
   But they had it before sorting results, but I think we can indeed remove it 
since we're always sorting. Removed 
https://github.com/apache/flink/pull/26113/commits/767ac268fb52bb86792e36c1b5992312901962dd
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to