> Which does not work since it cannot find lag function :-( lag and over are not supported at the moment with Table, so you need to use SQL for that.
> *Will this obey the watermark strategy of the original Datastream? (see further below)* Yes. The code at the end of the mail is correct and should work fine. I have just one comment, if you're using this DataStream only to create the Table instance, you could also just define the watermark using the Schema builder itself, as described here: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression- On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote: > Hello all > > I need to calculate the difference in time between ordered rows per > transactionId. All events should arrive within the timeframe set by the > out-of-orderness ( a couple of minutes). Events outside this time should be > ignored. > > In SQL this would be : > select transactionId , handlingTime , *handlingTime - > lag(handlingTime,1) over (partition by transactionId order by handlingTime) > as elapsedTime* > from table > > When I code : > Table result = tableEnv.sqlQuery("select transactionId, handlingTime, > *handlingTime > - if(null(lag(handlingTime) over (partition by transactionId order by > handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw") > > *Will this obey the watermark strategy of the original Datastream? (see > further below)* > I have also tried to use the Table Api with a session window like : > Table t = tupled3DsTable > .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w" > )).groupBy($("transactionId"), $("w")) > .select($("handlingTime"), $("transactionId"), $("originalEvent"), $( > "handlingTime").max().over($("w"))); > This gives: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Could not resolve over call. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > > and also : > Table t = tupled3DsTable > .window(Over.partitionby($("transactionId")).orderBy($( > "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $( > "originalEvent"), $("handlingTime").lag().as("previousHandlingTime")); > Which does not work since it cannot find lag function :-( > > In java I have the following setup: > WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy = > WatermarkStrategy > .<Tuple3<Long, String, > String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) > .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) > .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long, > String, String>>() { > @Override > public long extractTimestamp(Tuple3<Long, String, String> > element, long handlingTime) { > return element.f0; > }}); > > DataStream<Tuple3<Long, String, String>> tuple3dswm = > tuple3ds.assignTimestampsAndWatermarks(wmstrategy); > > Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, > Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", > "SOURCE_WATERMARK()") > .build()).as("handlingTime", "transactionId", "originalEvent"); > > > > > >