Hi,

I try to create a tumbling time window of 2 rows each in Flink Java. This must 
based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) 
column. I've added below the code of two different code versions. The error 
messages I get I placed above the code.

When I print the datatypes of the Table object I see this:  |-- mID: INT |-- 
dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT 
|-- mType: STRING

StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
        fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = 
new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);

//When I run below code I get this error: Caused by: 
java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a 
proper TimestampAssigner is defined and the stream environment uses the 
EventTime time characteristic.

Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, 
unixDateTime, mType");
   DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();

//When I run below code I get this error: Exception in thread "main" 
java.lang.UnsupportedOperationException: Event-time grouping windows on row 
intervals are currently not supported.

   Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, 
measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
    stream.print();

Reply via email to