gustavodemorais commented on code in PR #26113: URL: https://github.com/apache/flink/pull/26113#discussion_r1957126571
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala: ########## @@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo val expected = List("a,1", "a,2", "a,3") assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testUnnestWithOrdinalityWithValuesStream(): Unit = { + val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]) WITH ORDINALITY" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("a,1,1", "a,2,2", "a,3,3") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(12, 45)), + (2, Array(41, 5)), + (3, Array(18, 42)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT a, number, ordinality + |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", "3,42,2") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestFromTableWithOrdinality(): Unit = { + val data = List( + (1, 1L, Array("Hi", "w")), + (2, 2L, Array("Hello", "k")), + (3, 2L, Array("Hello world", "x")) + ) + + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + + val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A (s, o)" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello world,1", "3,x,2") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(Array(1, 2), Array(3))), + (2, Array(Array(4, 5), Array(6, 7))), + (3, Array(Array(8))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'nested_array) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, array_val, array_pos, `element`, element_pos + |FROM T + |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, array_pos) + |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, element_pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List( + "1,[1, 2],1,1,1", + "1,[1, 2],1,2,2", + "1,[3],2,3,1", + "2,[4, 5],1,4,1", + "2,[4, 5],1,5,2", + "2,[6, 7],2,6,1", + "2,[6, 7],2,7,2", + "3,[8],1,8,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMultisetWithOrdinality(): Unit = { + val data = List( + (1, 1, "Hi"), + (1, 2, "Hello"), + (2, 2, "World"), + (3, 3, "Hello world") + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a) + |SELECT a, word, pos + |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] + val sink = new TestingRetractSink + result.addSink(sink) + env.execute() + + val expected = List( + "1,Hi,1", + "1,Hello,2", + "2,World,1", + "3,Hello world,1" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMapWithOrdinality(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[String, String]() + map.put("a", "10") + map.put("b", "11") + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[String, String]() + map.put("c", "20") + map.put("d", "21") + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "map_data"), + Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT id, k, v, pos + |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutordinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21") + + assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted) + } + + def testUnnestForMapOfRowsWitOrdinality(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("a", "a"), Row.of(10: Integer)) + map.put(Row.of("b", "b"), Row.of(11: Integer)) + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("c", "c"), Row.of(20: Integer)) + map + }), + Row.of( + Int.box(3), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("d", "d"), Row.of(30: Integer)) + map.put(Row.of("e", "e"), Row.of(31: Integer)) + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("a", "b"), + Array[TypeInformation[_]]( + Types.INT, + Types.MAP(Types.ROW(Types.STRING, Types.STRING), Types.ROW(Types.INT()))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY as f (k, v, o)" + val result = tEnv.sqlQuery(sqlQuery) + TestSinkUtil.addValuesSink( + tEnv, + "MySink", + List("a", "k", "v", "o"), + List( + DataTypes.INT, + DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()), + DataTypes.ROW(DataTypes.INT()), + DataTypes.INT.notNull()), + ChangelogMode.all() + ) + result.executeInsert("MySink").await() + + val expected = + List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31") + val resultWithoutOrd = assertAndRemoveOrdinality( + TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList, + 2) + assertThat(resultWithoutOrd).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = { + val data = List( + Row.of( + Int.box(1), + Array("a", "b"), { + val map = new java.util.HashMap[String, String]() + map.put("x", "10") + map.put("y", "20") + map + }), + Row.of( + Int.box(2), + Array("c", "d"), { + val map = new java.util.HashMap[String, String]() + map.put("z", "30") + map.put("w", "40") + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "array_data", "map_data"), + Array[TypeInformation[_]]( + Types.INT, + Types.OBJECT_ARRAY(Types.STRING), + Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'array_data, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, array_val, array_pos, map_key, map_val, map_pos + |FROM T + |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, array_pos) + |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, map_pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutOrdinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List( + "1,a,1,x,10", + "1,a,1,y,20", + "1,b,2,x,10", + "1,b,2,y,20", + "2,c,1,z,30", + "2,c,1,w,40", + "2,d,2,z,30", + "2,d,2,w,40" + ) + + assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestWithOrdinalityForEmptyArray(): Unit = { + val data = List( + (1, Array[Int]()) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT a, number, ordinality + |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List() + assertThat(sink.getAppendResults.sorted).isEqualTo(expected) + } + + @TestTemplate + def testUnnestWithOrdinalityForMapWithNullValues(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[String, String]() + map.put("a", "10") + map.put("b", null) + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[String, String]() + map.put("c", "20") + map.put("d", null) + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "map_data"), + Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, k, v, pos + |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutordinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null") + assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = { + val data = List( + (2, Array((20, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List( + "2,[20,41.6, 14,45.2136],20,41.6,1", + "2,[20,41.6, 14,45.2136],14,45.2136,2", + "3,[18,42.6],18,42.6,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = { + val data = List( + (1, Array((12, "45.6"), (12, "45.612"))), + (2, Array((13, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("MyTable", t) + + val sqlQuery = + """ + |SELECT * FROM ( + | SELECT a, b1, b2, ord FROM + | (SELECT a, b FROM MyTable) T + | CROSS JOIN + | UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord) + | WHERE S.b1 >= 12 + | ) tmp + |WHERE b2 <> '42.6' AND ord <> 2 + """.stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,12,45.6,1", "2,13,41.6,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = { + val data = List( + (1, (12, "45.6")), + (2, (12, "45.612")), + (2, (13, "41.6")), + (3, (14, "45.2136")), + (3, (18, "42.6"))) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) Review Comment: See https://github.com/apache/flink/commit/e54bf99dd04c4f8139eeda71472746bb523d6081 ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala: ########## @@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mo val expected = List("a,1", "a,2", "a,3") assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testUnnestWithOrdinalityWithValuesStream(): Unit = { + val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]) WITH ORDINALITY" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("a,1,1", "a,2,2", "a,3,3") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(12, 45)), + (2, Array(41, 5)), + (3, Array(18, 42)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT a, number, ordinality + |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", "3,42,2") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestFromTableWithOrdinality(): Unit = { + val data = List( + (1, 1L, Array("Hi", "w")), + (2, 2L, Array("Hello", "k")), + (3, 2L, Array("Hello world", "x")) + ) + + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + + val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A (s, o)" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello world,1", "3,x,2") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfArrayWithOrdinality(): Unit = { + val data = List( + (1, Array(Array(1, 2), Array(3))), + (2, Array(Array(4, 5), Array(6, 7))), + (3, Array(Array(8))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'nested_array) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, array_val, array_pos, `element`, element_pos + |FROM T + |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, array_pos) + |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, element_pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List( + "1,[1, 2],1,1,1", + "1,[1, 2],1,2,2", + "1,[3],2,3,1", + "2,[4, 5],1,4,1", + "2,[4, 5],1,5,2", + "2,[6, 7],2,6,1", + "2,[6, 7],2,7,2", + "3,[8],1,8,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMultisetWithOrdinality(): Unit = { + val data = List( + (1, 1, "Hi"), + (1, 2, "Hello"), + (2, 2, "World"), + (3, 3, "Hello world") + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a) + |SELECT a, word, pos + |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row] + val sink = new TestingRetractSink + result.addSink(sink) + env.execute() + + val expected = List( + "1,Hi,1", + "1,Hello,2", + "2,World,1", + "3,Hello world,1" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMapWithOrdinality(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[String, String]() + map.put("a", "10") + map.put("b", "11") + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[String, String]() + map.put("c", "20") + map.put("d", "21") + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "map_data"), + Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT id, k, v, pos + |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutordinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21") + + assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted) + } + + def testUnnestForMapOfRowsWitOrdinality(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("a", "a"), Row.of(10: Integer)) + map.put(Row.of("b", "b"), Row.of(11: Integer)) + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("c", "c"), Row.of(20: Integer)) + map + }), + Row.of( + Int.box(3), { + val map = new java.util.HashMap[Row, Row]() + map.put(Row.of("d", "d"), Row.of(30: Integer)) + map.put(Row.of("e", "e"), Row.of(31: Integer)) + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("a", "b"), + Array[TypeInformation[_]]( + Types.INT, + Types.MAP(Types.ROW(Types.STRING, Types.STRING), Types.ROW(Types.INT()))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY as f (k, v, o)" + val result = tEnv.sqlQuery(sqlQuery) + TestSinkUtil.addValuesSink( + tEnv, + "MySink", + List("a", "k", "v", "o"), + List( + DataTypes.INT, + DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()), + DataTypes.ROW(DataTypes.INT()), + DataTypes.INT.notNull()), + ChangelogMode.all() + ) + result.executeInsert("MySink").await() + + val expected = + List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31") + val resultWithoutOrd = assertAndRemoveOrdinality( + TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList, + 2) + assertThat(resultWithoutOrd).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = { + val data = List( + Row.of( + Int.box(1), + Array("a", "b"), { + val map = new java.util.HashMap[String, String]() + map.put("x", "10") + map.put("y", "20") + map + }), + Row.of( + Int.box(2), + Array("c", "d"), { + val map = new java.util.HashMap[String, String]() + map.put("z", "30") + map.put("w", "40") + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "array_data", "map_data"), + Array[TypeInformation[_]]( + Types.INT, + Types.OBJECT_ARRAY(Types.STRING), + Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'array_data, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, array_val, array_pos, map_key, map_val, map_pos + |FROM T + |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, array_pos) + |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, map_pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutOrdinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List( + "1,a,1,x,10", + "1,a,1,y,20", + "1,b,2,x,10", + "1,b,2,y,20", + "2,c,1,z,30", + "2,c,1,w,40", + "2,d,2,z,30", + "2,d,2,w,40" + ) + + assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestWithOrdinalityForEmptyArray(): Unit = { + val data = List( + (1, Array[Int]()) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = """ + |SELECT a, number, ordinality + |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List() + assertThat(sink.getAppendResults.sorted).isEqualTo(expected) + } + + @TestTemplate + def testUnnestWithOrdinalityForMapWithNullValues(): Unit = { + val data = List( + Row.of( + Int.box(1), { + val map = new java.util.HashMap[String, String]() + map.put("a", "10") + map.put("b", null) + map + }), + Row.of( + Int.box(2), { + val map = new java.util.HashMap[String, String]() + map.put("c", "20") + map.put("d", null) + map + }) + ) + + implicit val typeInfo = Types.ROW( + Array("id", "map_data"), + Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, Types.STRING)) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 'map_data) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + """ + |SELECT id, k, v, pos + |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos) + |""".stripMargin + val result = tEnv.sqlQuery(sqlQuery).toDataStream + + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val resultsWithoutordinality = assertAndRemoveOrdinality(sink.getAppendResults, 2) + val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null") + assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = { + val data = List( + (2, Array((20, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) + + val sqlQuery = + "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)" + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List( + "2,[20,41.6, 14,45.2136],20,41.6,1", + "2,[20,41.6, 14,45.2136],14,45.2136,2", + "3,[18,42.6],18,42.6,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = { + val data = List( + (1, Array((12, "45.6"), (12, "45.612"))), + (2, Array((13, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("MyTable", t) + + val sqlQuery = + """ + |SELECT * FROM ( + | SELECT a, b1, b2, ord FROM + | (SELECT a, b FROM MyTable) T + | CROSS JOIN + | UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord) + | WHERE S.b1 >= 12 + | ) tmp + |WHERE b2 <> '42.6' AND ord <> 2 + """.stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toDataStream + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,12,45.6,1", "2,13,41.6,1") + assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = { + val data = List( + (1, (12, "45.6")), + (2, (12, "45.612")), + (2, (13, "41.6")), + (3, (14, "45.2136")), + (3, (18, "42.6"))) + val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b) + tEnv.createTemporaryView("T", t) Review Comment: After some decent amount of refactoring, I've parameterized almost all unnest stream tests, including without ordinality, to use a `assertUnnest` helper function. In total, refactored 24 tests and left 2 out because they use a failingDataSource and I thought it wasn't worth to add one more param to every test just because of the two. It's still a decent amount of code since there are some language restrictions like the variadic argument for fieldNames making it so that we can't have default value for params. Will definitely go with the new testing patterns in the future. At the same time, I'm not sure if it's worth to invest more time into it since we'll eventuallty to port these to java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org