gustavodemorais commented on code in PR #26113: URL: https://github.com/apache/flink/pull/26113#discussion_r1961296377
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala: ########## @@ -312,71 +306,509 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo (2, Array((13, "41.6"), (14, "45.2136"))), (3, Array((18, "42.6"))) ) - val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) - tEnv.createTemporaryView("T", t) + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Array[(Int, String)])], + sqlQuery = "SELECT a, b, A._1, A._2 FROM T, UNNEST(T.b) AS A where A._1 > 13", + expectedResults = List("2,[13,41.6, 14,45.2136],14,45.2136", "3,[18,42.6],18,42.6"), + isRetract = false, + parallelism = -1, + fieldNames = 'a, + 'b + ) + } - val sqlQuery = "SELECT a, b, A._1, A._2 FROM T, UNNEST(T.b) AS A where A._1 > 13" - val result = tEnv.sqlQuery(sqlQuery).toDataStream - val sink = new TestingAppendSink - result.addSink(sink) - env.execute() + @TestTemplate + def testUnnestArrayOfRowsWithNestedFilter(): Unit = { + val data = List( + (1, Array((12, "45.6"), (12, "45.612"))), + (2, Array((13, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Array[(Int, String)])], + sqlQuery = """ + |SELECT * FROM ( + | SELECT a, b1, b2 FROM + | (SELECT a, b FROM T) T2 + | CROSS JOIN + | UNNEST(T2.b) as S(b1, b2) + | WHERE S.b1 >= 12 + | ) tmp + |WHERE b2 <> '42.6' + |""".stripMargin, + expectedResults = List("1,12,45.612", "1,12,45.6", "2,13,41.6", "2,14,45.2136"), + isRetract = false, + parallelism = -1, + fieldNames = 'a, + 'b + ) + } - val expected = List("2,[13,41.6, 14,45.2136],14,45.2136", "3,[18,42.6],18,42.6") - assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + @TestTemplate + def testUnnestWithValuesStream(): Unit = { + assertUnnest( + testData = List(1), + typeInfo = createTypeInformation[Int], + sqlQuery = "SELECT * FROM UNNEST(ARRAY[1,2,3])", + expectedResults = List("1", "2", "3"), + isRetract = false, + parallelism = -1, + fieldNames = 'dummy + ) + } + + @TestTemplate + def testUnnestWithValuesStream2(): Unit = { + assertUnnest( + testData = List(1), + typeInfo = createTypeInformation[Int], + sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3])", + expectedResults = List("a,1", "a,2", "a,3"), + isRetract = false, + parallelism = -1, + fieldNames = 'dummy + ) + } + + @TestTemplate + def testUnnestWithOrdinalityWithValuesStream(): Unit = { + assertUnnest( + testData = List(1), + typeInfo = createTypeInformation[Int], + sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]) WITH ORDINALITY", + expectedResults = List("a,1,1", "a,2,2", "a,3,3"), + isRetract = false, + parallelism = -1, + fieldNames = 'dummy + ) + } + + @TestTemplate + def testUnnestArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(12, 45)), + (2, Array(41, 5)), + (3, Array(18, 42)) + ) + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Array[Int])], + sqlQuery = """ + |SELECT a, number, ordinality + |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality) + |""".stripMargin, + expectedResults = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", "3,42,2"), + isRetract = false, + parallelism = -1, + fieldNames = 'a, + 'b + ) } @TestTemplate - def testUnnestWithNestedFilter(): Unit = { + def testUnnestFromTableWithOrdinality(): Unit = { + val data = List( + (1, 1L, Array("Hi", "w")), + (2, 2L, Array("Hello", "k")), + (3, 2L, Array("Hello world", "x")) + ) + + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Long, Array[String])], + sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A (s, o)", + expectedResults = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello world,1", "3,x,2"), + isRetract = false, + parallelism = -1, + fieldNames = 'a, + 'b, + 'c + ) + } + + @TestTemplate + def testUnnestArrayOfArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(Array(1, 2), Array(3))), + (2, Array(Array(4, 5), Array(6, 7))), + (3, Array(Array(8))) + ) + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Array[Array[Int]])], + sqlQuery = """ + |SELECT id, array_val, array_pos, `element`, element_pos + |FROM T + |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, array_pos) + |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, element_pos) + |""".stripMargin, + expectedResults = List( + "1,[1, 2],1,1,1", + "1,[1, 2],1,2,2", + "1,[3],2,3,1", + "2,[4, 5],1,4,1", + "2,[4, 5],1,5,2", + "2,[6, 7],2,6,1", + "2,[6, 7],2,7,2", + "3,[8],1,8,1"), + isRetract = false, + parallelism = -1, + fieldNames = 'id, + 'nested_array + ) + } + + @TestTemplate + def testUnnestMultisetWithOrdinality(): Unit = { + val data = List( + (1, 1, "Hi"), + (1, 2, "Hello"), + (2, 2, "World"), + (3, 3, "Hello world") + ) + assertUnnest( + testData = data, + typeInfo = createTypeInformation[(Int, Int, String)], + sqlQuery = """ + |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a) + |SELECT a, word, pos + |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos) + |""".stripMargin, + expectedResults = List( + "1,Hi,1", + "1,Hello,2", + "2,World,1", + "3,Hello world,1" + ), + isRetract = true, + parallelism = -1, + fieldNames = 'a, + 'b, + 'c + ) + } + + @TestTemplate + def testUnnestMapWithOrdinality(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[String, String]() + map.put("a", "10") + map.put("b", "11") + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[String, String]() + map.put("c", "20") + map.put("d", "21") + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "map_data"), + Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, Types.STRING)) + ) + + assertUnnest( + testData = data, + typeInfo = typeInfo, + sqlQuery = """ + |SELECT id, k, v, pos + |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos) + |""".stripMargin, + expectedResults = List("1,a,10,1", "1,b,11,2", "2,c,20,2", "2,d,21,1"), + isRetract = false, + parallelism = 1, Review Comment: Hmm, two old unnest tests were using it and that's why I had kept it https://github.com/apache/flink/pull/26113/files#diff-b15987b33460c38f1db98fee8ac3b8a4cf7f044918fd46f4ba2908e9386effd2L162 But they had it before sorting results, but I think we can indeed remove it since we're always sorting. Removed https://github.com/apache/flink/pull/26113/commits/767ac268fb52bb86792e36c1b5992312901962dd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org