Hello all I need to calculate the difference in time between ordered rows per transactionId. All events should arrive within the timeframe set by the out-of-orderness ( a couple of minutes). Events outside this time should be ignored.
In SQL this would be : select transactionId , handlingTime , *handlingTime - lag(handlingTime,1) over (partition by transactionId order by handlingTime) as elapsedTime* from table When I code : Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime - if(null(lag(handlingTime) over (partition by transactionId order by handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw") *Will this obey the watermark strategy of the original Datastream? (see further below)* I have also tried to use the Table Api with a session window like : Table t = tupled3DsTable .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w")) .groupBy($("transactionId"), $("w")) .select($("handlingTime"), $("transactionId"), $("originalEvent"), $( "handlingTime").max().over($("w"))); This gives: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not resolve over call. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) and also : Table t = tupled3DsTable .window(Over.partitionby($("transactionId")).orderBy($( "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $( "originalEvent"), $("handlingTime").lag().as("previousHandlingTime")); Which does not work since it cannot find lag function :-( In java I have the following setup: WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy = WatermarkStrategy .<Tuple3<Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long, String, String>>() { @Override public long extractTimestamp(Tuple3<Long, String, String> element, long handlingTime) { return element.f0; }}); DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy); Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()") .build()).as("handlingTime", "transactionId", "originalEvent");