Thanks for the feedback. The CSV is a good idea and will make my tests more readable, I'll use that.
Looking forward to Flink 1.13 ! Svend On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote: > Hi, > > there are multiple ways to create a table for testing: > > - use the datagen connector > - use the filesystem connector with CSV data > - and beginning from Flink 1.13 your code snippets becomes much simpler > > Regards, > Timo > > On 29.04.21 20:35, Svend wrote: > > I found an answer to my own question! > > > > For future reference, the snipet below allows to create a SQL table with > > a nested field and a watermark and filled with hard-coded values, which > > is all I need in order to test SQL expressions. > > > > It's quite a mouthful though, is there a more succint to express the > > same thing? > > > > > > var testData = List./of/( > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")), > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")), > > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20")) > > ); > > var testStream = streamEnv > > .fromCollection(testData, > > Types./ROW_NAMED/(new String[] {"created", "event_time"}, > > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/), > > Types./SQL_TIMESTAMP > > /) > > ) > > .assignTimestampsAndWatermarks(WatermarkStrategy > > .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10)) > > .withTimestampAssigner( > > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) > > (t2.getField(1))).getTime())) > > ); > > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), > > /$/("event_time").rowtime()); > > tableEnv.createTemporaryView("post_events_kafka", testDataTable); > > > > > > > > > > > > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote: > >> I'm trying to write java unit test for a Flink SQL application using > >> Flink mini cluster, but I do not manage to create an input table with > >> nested fields and time characteristics. > >> > >> I had a look at the documentation and examples below, although I'm > >> still struggling: > >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html > >> > >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html> > >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java > >> > >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java> > >> > >> > >> Consider for example this simple expression that I want to test and > >> which depends on the nested field "created.group_id" and expects > >> "metricValue" to be the row time: > >> > >> > >> var createTableDDl = "" > >> + " CREATE TEMPORARY VIEW > >> postCreated10min \n" > >> + " > >> AS > >> > >> \n" > >> + " > >> SELECT > >> > >> \n" > >> + " created.group_id as > >> groupId, \n" > >> + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as > >> metricTime, \n" > >> + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as > >> rowTime, \n" > >> + " count(1) as > >> metricValue \n" > >> + " FROM > >> post_events_kafka \n" > >> + " GROUP > >> BY \n" > >> + " > >> created.group_id, > >> \n" > >> + " TUMBLE(event_time, INTERVAL '10' > >> MINUTES) \n"; > >> tableEnv.executeSql(createTableDDl); > >> > >> > >> In a unit test, the following syntax allows me to create test input > >> data with nested fields, but I have not found how to specify row time > >> nor watermarks with this approach: > >> > >> > >> Table testTable = tableEnv.fromValues( > >> DataTypes.ROW( > >> DataTypes.FIELD("created", > >> DataTypes.ROW( > >> DataTypes.FIELD("group_id", DataTypes.STRING()) > >> ) > >> ), > >> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) > >> ), > >> > >> row(row("group123"), "2021-02-03 11:36:20"), > >> row(row("group123"), "2021-02-03 11:38:20"), > >> row(row("group123"), "2021-02-03 11:40:20") > >> ); > >> tableEnv.createTemporaryView("post_events_kafka", testTable); > >> > >> > >> I have also tried the following syntax, which allows to specify > >> watermark and row time, but I have not found how to create a nested > >> field with this approach: > >> > >> > >> var testData = List.of( > >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), > >> sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), > >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) > >> ); > >> var testStream = streamEnv > >> .fromCollection(testData) > >> .assignTimestampsAndWatermarks(WatermarkStrategy > >> .<Tuple2<String, > >> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10)) > >> .withTimestampAssigner( > >> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) > >> ); > >> var testDataTable = tableEnv.fromDataStream( > >> testStream, > >> $("group_id"), $("true_as_of"), $("event_time").rowtime() > >> ); > >> tableEnv.createTemporaryView("post_events_kafka", testDataTable); > >> > >> > >> > >> What am I missing? > > > >