Thanks for the feedback.
The CSV is a good idea and will make my tests more readable, I'll use that.
Looking forward to Flink 1.13 !
Svend
On Fri, 30 Apr 2021, at 9:09 AM, Timo Walther wrote:
> Hi,
>
> there are multiple ways to create a table for testing:
>
> - use the datagen connector
> - use the filesystem connector with CSV data
> - and beginning from Flink 1.13 your code snippets becomes much simpler
>
> Regards,
> Timo
>
> On 29.04.21 20:35, Svend wrote:
> > I found an answer to my own question!
> >
> > For future reference, the snipet below allows to create a SQL table with
> > a nested field and a watermark and filled with hard-coded values, which
> > is all I need in order to test SQL expressions.
> >
> > It's quite a mouthful though, is there a more succint to express the
> > same thing?
> >
> >
> > var testData = List./of/(
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
> > Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
> > );
> > var testStream = streamEnv
> > .fromCollection(testData,
> > Types./ROW_NAMED/(new String[] {"created", "event_time"},
> > Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
> > Types./SQL_TIMESTAMP
> > /)
> > )
> > .assignTimestampsAndWatermarks(WatermarkStrategy
> > .<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
> > .withTimestampAssigner(
> > TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp)
> > (t2.getField(1))).getTime()))
> > );
> > var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"),
> > /$/("event_time").rowtime());
> > tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> >
> >
> >
> >
> >
> > On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
> >> I'm trying to write java unit test for a Flink SQL application using
> >> Flink mini cluster, but I do not manage to create an input table with
> >> nested fields and time characteristics.
> >>
> >> I had a look at the documentation and examples below, although I'm
> >> still struggling:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> >>
> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html>
> >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
> >>
> >> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>
> >>
> >>
> >> Consider for example this simple expression that I want to test and
> >> which depends on the nested field "created.group_id" and expects
> >> "metricValue" to be the row time:
> >>
> >>
> >> var createTableDDl = ""
> >> + " CREATE TEMPORARY VIEW
> >> postCreated10min \n"
> >> + "
> >> AS
> >>
> >> \n"
> >> + "
> >> SELECT
> >>
> >> \n"
> >> + " created.group_id as
> >> groupId, \n"
> >> + " TUMBLE_END(event_time, INTERVAL '10' MINUTES) as
> >> metricTime, \n"
> >> + " TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as
> >> rowTime, \n"
> >> + " count(1) as
> >> metricValue \n"
> >> + " FROM
> >> post_events_kafka \n"
> >> + " GROUP
> >> BY \n"
> >> + "
> >> created.group_id,
> >> \n"
> >> + " TUMBLE(event_time, INTERVAL '10'
> >> MINUTES) \n";
> >> tableEnv.executeSql(createTableDDl);
> >>
> >>
> >> In a unit test, the following syntax allows me to create test input
> >> data with nested fields, but I have not found how to specify row time
> >> nor watermarks with this approach:
> >>
> >>
> >> Table testTable = tableEnv.fromValues(
> >> DataTypes.ROW(
> >> DataTypes.FIELD("created",
> >> DataTypes.ROW(
> >> DataTypes.FIELD("group_id", DataTypes.STRING())
> >> )
> >> ),
> >> DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
> >> ),
> >>
> >> row(row("group123"), "2021-02-03 11:36:20"),
> >> row(row("group123"), "2021-02-03 11:38:20"),
> >> row(row("group123"), "2021-02-03 11:40:20")
> >> );
> >> tableEnv.createTemporaryView("post_events_kafka", testTable);
> >>
> >>
> >> I have also tried the following syntax, which allows to specify
> >> watermark and row time, but I have not found how to create a nested
> >> field with this approach:
> >>
> >>
> >> var testData = List.of(
> >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
> >> sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
> >> Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
> >> );
> >> var testStream = streamEnv
> >> .fromCollection(testData)
> >> .assignTimestampsAndWatermarks(WatermarkStrategy
> >> .<Tuple2<String,
> >> Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
> >> .withTimestampAssigner(
> >> TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
> >> );
> >> var testDataTable = tableEnv.fromDataStream(
> >> testStream,
> >> $("group_id"), $("true_as_of"), $("event_time").rowtime()
> >> );
> >> tableEnv.createTemporaryView("post_events_kafka", testDataTable);
> >>
> >>
> >>
> >> What am I missing?
> >
>
>