Thanks Would the option for datastream be to write a MapPartitionFunction?
Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani < france...@ververica.com>: > > Which does not work since it cannot find lag function :-( > > lag and over are not supported at the moment with Table, so you need to > use SQL for that. > > > *Will this obey the watermark strategy of the original Datastream? (see > further below)* > > Yes. The code at the end of the mail is correct and should work fine. I > have just one comment, if you're using this DataStream only to create the > Table instance, you could also just define the watermark using the Schema > builder itself, as described here: > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression- > > On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote: > >> Hello all >> >> I need to calculate the difference in time between ordered rows per >> transactionId. All events should arrive within the timeframe set by the >> out-of-orderness ( a couple of minutes). Events outside this time should be >> ignored. >> >> In SQL this would be : >> select transactionId , handlingTime , *handlingTime - >> lag(handlingTime,1) over (partition by transactionId order by handlingTime) >> as elapsedTime* >> from table >> >> When I code : >> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, >> *handlingTime >> - if(null(lag(handlingTime) over (partition by transactionId order by >> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw") >> >> *Will this obey the watermark strategy of the original Datastream? (see >> further below)* >> I have also tried to use the Table Api with a session window like : >> Table t = tupled3DsTable >> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w" >> )).groupBy($("transactionId"), $("w")) >> .select($("handlingTime"), $("transactionId"), $("originalEvent"), $( >> "handlingTime").max().over($("w"))); >> This gives: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: Could not resolve over call. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) >> >> and also : >> Table t = tupled3DsTable >> .window(Over.partitionby($("transactionId")).orderBy($( >> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), >> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime")); >> Which does not work since it cannot find lag function :-( >> >> In java I have the following setup: >> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy = >> WatermarkStrategy >> .<Tuple3<Long, String, >> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) >> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) >> .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long, >> String, String>>() { >> @Override >> public long extractTimestamp(Tuple3<Long, String, String> >> element, long handlingTime) { >> return element.f0; >> }}); >> >> DataStream<Tuple3<Long, String, String>> tuple3dswm = >> tuple3ds.assignTimestampsAndWatermarks(wmstrategy); >> >> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, >> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", >> "SOURCE_WATERMARK()") >> .build()).as("handlingTime", "transactionId", "originalEvent"); >> >> >> >> >> >>