Hi A.V.,
*//When I run below code I get this error: Caused by:
java.lang.RuntimeException: Rowtime timestamp is null. //Please make sure
that a proper TimestampAssigner is defined and the stream environment uses
the EventTime time characteristic.*
You need to assign Timestamp and watermarks to datastream
sample data stream code
DataStream<Tuple17<String, Timestamp>>
lineitem = bsEnv.socketTextStream("localhost", 12347).flatMap(new
Splitter2()).assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<String, Timestamp>>() {
@Override public long extractAscendingTimestamp(
Tuple2<String,Timestamp> inputrowtuple) {
return inputrowtuple.f2.getTime();
}
});
*//When I run below code I get this error: Exception in thread "main"
java.lang.UnsupportedOperationException: Event-time grouping windows on row
intervals are currently not supported.*
Currently this feature is only supported for Process time, change .rowtime
to .proctime in schema
On Wed, Oct 23, 2019 at 1:11 PM A. V. <[email protected]> wrote:
> Hi,
>
> I try to create a tumbling time window of 2 rows each in Flink Java. This
> must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT
> datatype) column. I've added below the code of two different code versions.
> The error messages I get I placed above the code.
>
> When I print the datatypes of the Table object I see this: |-- mID: INT
> |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime:
> BIGINT |-- mType: STRING
>
> StreamExecutionEnvironment fsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(fsEnv);
> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType
> = new TupleTypeInfo<>(
> Types.INT(),
> Types.SQL_TIMESTAMP(),
> Types.DOUBLE(),
> Types.LONG(),
> Types.STRING());
> DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
> tableEnv.toAppendStream(HTable, tupleType);
> //When I run below code I get this error: Caused by:
> java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that
> a proper TimestampAssigner is defined and the stream environment uses the
> EventTime time characteristic.
> Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime,
> mValue, unixDateTime, mType");
> DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
> stream.print();
> //When I run below code I get this error: Exception in thread "main"
> java.lang.UnsupportedOperationException: Event-time grouping windows on row
> intervals are currently not supported.
>
> Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime,
> measurementValue, unixDateTime,
> measurementType").window(Tumble.over("2.rows").on("dateTime").as("a")).groupBy("a").select("AVG(mValue)");
> DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
> stream.print();
>
>
>
--
Regards,
Manoj Kumar