gustavodemorais commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1957126571


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("a,1,1", "a,2,2", "a,3,3")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality 
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A 
(s, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'nested_array)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,[1, 2],1,1,1",
+      "1,[1, 2],1,2,2",
+      "1,[3],2,3,1",
+      "2,[4, 5],1,4,1",
+      "2,[4, 5],1,5,2",
+      "2,[6, 7],2,6,1",
+      "2,[6, 7],2,7,2",
+      "3,[8],1,8,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hi,1",
+      "1,Hello,2",
+      "2,World,1",
+      "3,Hello world,1"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT id, k, v, pos
+                     |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS 
f(k, v, pos)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21")
+
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  def testUnnestForMapOfRowsWitOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("a", "a"), Row.of(10: Integer))
+          map.put(Row.of("b", "b"), Row.of(11: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("c", "c"), Row.of(20: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(3), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("d", "d"), Row.of(30: Integer))
+          map.put(Row.of("e", "e"), Row.of(31: Integer))
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("a", "b"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.MAP(Types.ROW(Types.STRING, Types.STRING), 
Types.ROW(Types.INT())))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH 
ORDINALITY as f (k, v, o)"
+    val result = tEnv.sqlQuery(sqlQuery)
+    TestSinkUtil.addValuesSink(
+      tEnv,
+      "MySink",
+      List("a", "k", "v", "o"),
+      List(
+        DataTypes.INT,
+        DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
+        DataTypes.ROW(DataTypes.INT()),
+        DataTypes.INT.notNull()),
+      ChangelogMode.all()
+    )
+    result.executeInsert("MySink").await()
+
+    val expected =
+      List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
+    val resultWithoutOrd = assertAndRemoveOrdinality(
+      TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
+      2)
+    assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1),
+        Array("a", "b"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("x", "10")
+          map.put("y", "20")
+          map
+        }),
+      Row.of(
+        Int.box(2),
+        Array("c", "d"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("z", "30")
+          map.put("w", "40")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "array_data", "map_data"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.OBJECT_ARRAY(Types.STRING),
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'array_data, 'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, map_key, map_val, map_pos
+        |FROM T
+        |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, 
map_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutOrdinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List(
+      "1,a,1,x,10",
+      "1,a,1,y,20",
+      "1,b,2,x,10",
+      "1,b,2,y,20",
+      "2,c,1,z,30",
+      "2,c,1,w,40",
+      "2,d,2,z,30",
+      "2,d,2,w,40"
+    )
+
+    assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForEmptyArray(): Unit = {
+    val data = List(
+      (1, Array[Int]())
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List()
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForMapWithNullValues(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", null)
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", null)
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, k, v, pos
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null")
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (2, Array((20, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2,[20,41.6, 14,45.2136],20,41.6,1",
+      "2,[20,41.6, 14,45.2136],14,45.2136,2",
+      "3,[18,42.6],18,42.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2, ord FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6' AND ord <> 2
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,13,41.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
+    val data = List(
+      (1, (12, "45.6")),
+      (2, (12, "45.612")),
+      (2, (13, "41.6")),
+      (3, (14, "45.2136")),
+      (3, (18, "42.6")))
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)

Review Comment:
   See 
https://github.com/apache/flink/commit/e54bf99dd04c4f8139eeda71472746bb523d6081



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("a,1,1", "a,2,2", "a,3,3")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality 
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A 
(s, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'nested_array)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,[1, 2],1,1,1",
+      "1,[1, 2],1,2,2",
+      "1,[3],2,3,1",
+      "2,[4, 5],1,4,1",
+      "2,[4, 5],1,5,2",
+      "2,[6, 7],2,6,1",
+      "2,[6, 7],2,7,2",
+      "3,[8],1,8,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hi,1",
+      "1,Hello,2",
+      "2,World,1",
+      "3,Hello world,1"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT id, k, v, pos
+                     |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS 
f(k, v, pos)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21")
+
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  def testUnnestForMapOfRowsWitOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("a", "a"), Row.of(10: Integer))
+          map.put(Row.of("b", "b"), Row.of(11: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("c", "c"), Row.of(20: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(3), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("d", "d"), Row.of(30: Integer))
+          map.put(Row.of("e", "e"), Row.of(31: Integer))
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("a", "b"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.MAP(Types.ROW(Types.STRING, Types.STRING), 
Types.ROW(Types.INT())))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH 
ORDINALITY as f (k, v, o)"
+    val result = tEnv.sqlQuery(sqlQuery)
+    TestSinkUtil.addValuesSink(
+      tEnv,
+      "MySink",
+      List("a", "k", "v", "o"),
+      List(
+        DataTypes.INT,
+        DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
+        DataTypes.ROW(DataTypes.INT()),
+        DataTypes.INT.notNull()),
+      ChangelogMode.all()
+    )
+    result.executeInsert("MySink").await()
+
+    val expected =
+      List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
+    val resultWithoutOrd = assertAndRemoveOrdinality(
+      TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
+      2)
+    assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1),
+        Array("a", "b"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("x", "10")
+          map.put("y", "20")
+          map
+        }),
+      Row.of(
+        Int.box(2),
+        Array("c", "d"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("z", "30")
+          map.put("w", "40")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "array_data", "map_data"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.OBJECT_ARRAY(Types.STRING),
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'array_data, 'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, map_key, map_val, map_pos
+        |FROM T
+        |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, 
map_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutOrdinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List(
+      "1,a,1,x,10",
+      "1,a,1,y,20",
+      "1,b,2,x,10",
+      "1,b,2,y,20",
+      "2,c,1,z,30",
+      "2,c,1,w,40",
+      "2,d,2,z,30",
+      "2,d,2,w,40"
+    )
+
+    assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForEmptyArray(): Unit = {
+    val data = List(
+      (1, Array[Int]())
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List()
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForMapWithNullValues(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", null)
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", null)
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, k, v, pos
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null")
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (2, Array((20, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2,[20,41.6, 14,45.2136],20,41.6,1",
+      "2,[20,41.6, 14,45.2136],14,45.2136,2",
+      "3,[18,42.6],18,42.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2, ord FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6' AND ord <> 2
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,13,41.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
+    val data = List(
+      (1, (12, "45.6")),
+      (2, (12, "45.612")),
+      (2, (13, "41.6")),
+      (3, (14, "45.2136")),
+      (3, (18, "42.6")))
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)

Review Comment:
   After some decent amount of refactoring, I've parameterized almost all 
unnest stream tests, including without ordinality, to use a `assertUnnest` 
helper function.
    
   In total, refactored 24 tests and left 2 out because they use a 
failingDataSource and I thought it wasn't worth to add one more param to every 
test just because of the two. It's still a decent amount of code since there 
are some language restrictions like the variadic argument for fieldNames making 
it so that we can't have default value for params. Will definitely go with the 
new testing patterns in the future. At the same time, I'm not sure if it's 
worth to invest more time into it since we'll eventuallty to port these to java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to