I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics.
I had a look at the documentation and examples below, although I'm still struggling: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time: var createTableDDl = "" + " CREATE TEMPORARY VIEW postCreated10min \n" + " AS \n" + " SELECT \n" + " created.group_id as groupId, \n" + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime, \n" + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime, \n" + " count(1) as metricValue \n" + " FROM post_events_kafka \n" + " GROUP BY \n" + " created.group_id, \n" + " TUMBLE(event_time, INTERVAL '10' MINUTES) \n"; tableEnv.executeSql(createTableDDl); In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach: Table testTable = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("created", DataTypes.ROW( DataTypes.FIELD("group_id", DataTypes.STRING()) ) ), DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)) ), row(row("group123"), "2021-02-03 11:36:20"), row(row("group123"), "2021-02-03 11:38:20"), row(row("group123"), "2021-02-03 11:40:20") ); tableEnv.createTemporaryView("post_events_kafka", testTable); I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach: var testData = List.of( Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")), sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")), Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20")) ); var testStream = streamEnv .fromCollection(testData) .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10)) .withTimestampAssigner( TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime())) ); var testDataTable = tableEnv.fromDataStream( testStream, $("group_id"), $("true_as_of"), $("event_time").rowtime() ); tableEnv.createTemporaryView("post_events_kafka", testDataTable); What am I missing?