Why do you need MapPartitionFunction? On Wed, Feb 16, 2022 at 7:02 PM HG <hanspeter.sl...@gmail.com> wrote:
> Thanks > > Would the option for datastream be to write a MapPartitionFunction? > > Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani < > france...@ververica.com>: > >> > Which does not work since it cannot find lag function :-( >> >> lag and over are not supported at the moment with Table, so you need to >> use SQL for that. >> >> > *Will this obey the watermark strategy of the original Datastream? >> (see further below)* >> >> Yes. The code at the end of the mail is correct and should work fine. I >> have just one comment, if you're using this DataStream only to create the >> Table instance, you could also just define the watermark using the Schema >> builder itself, as described here: >> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression- >> >> On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote: >> >>> Hello all >>> >>> I need to calculate the difference in time between ordered rows per >>> transactionId. All events should arrive within the timeframe set by the >>> out-of-orderness ( a couple of minutes). Events outside this time should be >>> ignored. >>> >>> In SQL this would be : >>> select transactionId , handlingTime , *handlingTime - >>> lag(handlingTime,1) over (partition by transactionId order by handlingTime) >>> as elapsedTime* >>> from table >>> >>> When I code : >>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, >>> *handlingTime >>> - if(null(lag(handlingTime) over (partition by transactionId order by >>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw") >>> >>> *Will this obey the watermark strategy of the original Datastream? (see >>> further below)* >>> I have also tried to use the Table Api with a session window like : >>> Table t = tupled3DsTable >>> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as( >>> "w")).groupBy($("transactionId"), $("w")) >>> .select($("handlingTime"), $("transactionId"), $("originalEvent"), $( >>> "handlingTime").max().over($("w"))); >>> This gives: >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Could not resolve over call. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) >>> >>> and also : >>> Table t = tupled3DsTable >>> .window(Over.partitionby($("transactionId")).orderBy($( >>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), >>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime")); >>> Which does not work since it cannot find lag function :-( >>> >>> In java I have the following setup: >>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy = >>> WatermarkStrategy >>> .<Tuple3<Long, String, >>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) >>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) >>> .withTimestampAssigner(new >>> SerializableTimestampAssigner<Tuple3<Long, >>> String, String>>() { >>> @Override >>> public long extractTimestamp(Tuple3<Long, String, String> >>> element, long handlingTime) { >>> return element.f0; >>> }}); >>> >>> DataStream<Tuple3<Long, String, String>> tuple3dswm = >>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy); >>> >>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, >>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", >>> "SOURCE_WATERMARK()") >>> .build()).as("handlingTime", "transactionId", "originalEvent"); >>> >>> >>> >>> >>> >>>