I found an answer to my own question!
For future reference, the snipet below allows to create a SQL table with a
nested field and a watermark and filled with hard-coded values, which is all I
need in order to test SQL expressions.
It's quite a mouthful though, is there a more succint to express the same thing?
var testData = List.*of*(
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:36:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:38:20")),
Row.*of*(Row.*of*("group123"), Timestamp.*valueOf*("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types.*ROW_NAMED*(new String[] {"created", "event_time"},
Types.*ROW_NAMED*(new String[] {"fandom_id"}, Types.*STRING*),
Types.*SQL_TIMESTAMP
*)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Row>*forBoundedOutOfOrderness*(Duration.*ofMinutes*(10))
.withTimestampAssigner(
TimestampAssignerSupplier.*of*((t2, t) -> ((Timestamp)
(t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, *$*("created"),
*$*("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);
On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> I'm trying to write java unit test for a Flink SQL application using Flink
> mini cluster, but I do not manage to create an input table with nested fields
> and time characteristics.
>
> I had a look at the documentation and examples below, although I'm still
> struggling:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
>
>
> Consider for example this simple expression that I want to test and which
> depends on the nested field "created.group_id" and expects "metricValue" to
> be the row time:
>
>
> var createTableDDl = ""
> + " CREATE TEMPORARY VIEW postCreated10min
> \n"
> + " AS
> \n"
> + " SELECT
> \n"
> + " created.group_id as groupId,
> \n"
> + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,
> \n"
> + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,
> \n"
> + " count(1) as metricValue
> \n"
> + " FROM post_events_kafka
> \n"
> + " GROUP BY
> \n"
> + " created.group_id,
> \n"
> + " TUMBLE(event_time, INTERVAL '10' MINUTES)
> \n";
> tableEnv.executeSql(createTableDDl);
>
>
> In a unit test, the following syntax allows me to create test input data with
> nested fields, but I have not found how to specify row time nor watermarks
> with this approach:
>
>
> Table testTable = tableEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("created",
> DataTypes.ROW(
> DataTypes.FIELD("group_id", DataTypes.STRING())
> )
> ),
> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
> ),
>
> row(row("group123"), "2021-02-03 11:36:20"),
> row(row("group123"), "2021-02-03 11:38:20"),
> row(row("group123"), "2021-02-03 11:40:20")
> );
> tableEnv.createTemporaryView("post_events_kafka", testTable);
>
>
> I have also tried the following syntax, which allows to specify watermark and
> row time, but I have not found how to create a nested field with this
> approach:
>
>
> var testData = List.of(
> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
> sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> );
> var testStream = streamEnv
> .fromCollection(testData)
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
> .withTimestampAssigner(
> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
> );
> var testDataTable = tableEnv.fromDataStream(
> testStream,
> $("group_id"), $("true_as_of"), $("event_time").rowtime()
> );
> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
>
>
>
> What am I missing?