Hi, I try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added below the code of two different code versions. The error messages I get I placed above the code.
When I print the datatypes of the Table object I see this: |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv); fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>( Types.INT(), Types.SQL_TIMESTAMP(), Types.DOUBLE(), Types.LONG(), Types.STRING()); DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple = tableEnv.toAppendStream(HTable, tupleType); //When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic. Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType"); DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class); stream.print(); //When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported. Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType") .window(Tumble.over("2.rows") .on("dateTime") .as("a")) .groupBy("a") .select("AVG(mValue)"); DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class); stream.print();