I'm trying to write java unit test for a Flink SQL application using Flink mini 
cluster, but I do not manage to create an input table with nested fields and 
time characteristics.

I had a look at the documentation and examples below, although I'm still 
struggling:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java


Consider for example this simple expression that I want to test and which 
depends on the nested field "created.group_id" and expects "metricValue" to be 
the row time:


var createTableDDl = ""
        + " CREATE TEMPORARY VIEW postCreated10min                              
        \n"
        + " AS                                                                  
        \n"
        + " SELECT                                                              
        \n"
        + "   created.group_id as groupId,                                      
        \n"
        + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,      
        \n"
        + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,     
        \n"
        + "   count(1) as metricValue                                           
        \n"
        + " FROM post_events_kafka                                              
        \n"
        + " GROUP BY                                                            
        \n"
        + "   created.group_id,                                                 
        \n"
        + "   TUMBLE(event_time, INTERVAL '10' MINUTES)                         
        \n";
tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input data with 
nested fields, but I have not found how to specify row time nor watermarks with 
this approach:


Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
    DataTypes.FIELD("created",
      DataTypes.ROW(
        DataTypes.FIELD("group_id", DataTypes.STRING())
      )
    ),
    DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),
 
  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify watermark and 
row time, but I have not found how to create a nested field with this approach:


var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner(
    TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am I missing?

Reply via email to