I found an answer to my own question! For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions.
It's quite a mouthful though, is there a more succint to express the same thing? var testData = List.*of*( Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")), Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")), Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20")) ); var testStream = streamEnv .fromCollection(testData, Types.*ROW_NAMED*(new String[] {"created", "event_time"}, Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*), Types.*SQL_TIMESTAMP *) ) .assignTimestampsAndWatermarks(WatermarkStrategy .<Row>*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10)) .withTimestampAssigner( TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp) (t2.getField(1))).getTime())) ); var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"), *$*("event_time").rowtime()); tableEnv.createTemporaryView("post_events_kafka", testDataTable); On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote: > I'm trying to write java unit test for a Flink SQL application using Flink > mini cluster, but I do not manage to create an input table with nested fields > and time characteristics. > > I had a look at the documentation and examples below, although I'm still > struggling: > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java > > > Consider for example this simple expression that I want to test and which > depends on the nested field "created.group_id" and expects "metricValue" to > be the row time: > > > var createTableDDl = "" > + " CREATE TEMPORARY VIEW postCreated10min > \n" > + " AS > \n" > + " SELECT > \n" > + " created.group_id as groupId, > \n" > + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime, > \n" > + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, > \n" > + " count(1) as metricValue > \n" > + " FROM post_events_kafka > \n" > + " GROUP BY > \n" > + " created.group_id, > \n" > + " TUMBLE(event_time, INTERVAL '10' MINUTES) > \n"; > tableEnv.executeSql(createTableDDl); > > > In a unit test, the following syntax allows me to create test input data with > nested fields, but I have not found how to specify row time nor watermarks > with this approach: > > > Table testTable = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("created", > DataTypes.ROW( > DataTypes.FIELD("group_id", DataTypes.STRING()) > ) > ), > DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) > ), > > row(row("group123"), "2021-02-03 11:36:20"), > row(row("group123"), "2021-02-03 11:38:20"), > row(row("group123"), "2021-02-03 11:40:20") > ); > tableEnv.createTemporaryView("post_events_kafka", testTable); > > > I have also tried the following syntax, which allows to specify watermark and > row time, but I have not found how to create a nested field with this > approach: > > > var testData = List.of( > Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), > sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), > Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) > ); > var testStream = streamEnv > .fromCollection(testData) > .assignTimestampsAndWatermarks(WatermarkStrategy > .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10)) > .withTimestampAssigner( > TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) > ); > var testDataTable = tableEnv.fromDataStream( > testStream, > $("group_id"), $("true_as_of"), $("event_time").rowtime() > ); > tableEnv.createTemporaryView("post_events_kafka", testDataTable); > > > > What am I missing?