> Which does not work since it cannot find lag function :-(

lag and over are not supported at the moment with Table, so you need to use
SQL for that.

> *Will this obey the watermark strategy of the original Datastream? (see
further below)*

Yes. The code at the end of the mail is correct and should work fine. I
have just one comment, if you're using this DataStream only to create the
Table instance, you could also just define the watermark using the Schema
builder itself, as described here:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-

On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote:

> Hello all
>
> I need to calculate the difference in time between ordered rows per
> transactionId. All events should arrive within the timeframe set by the
> out-of-orderness ( a couple of minutes). Events outside this time should be
> ignored.
>
> In SQL this would be :
> select transactionId  , handlingTime , *handlingTime -
> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
> as elapsedTime*
> from table
>
> When I code :
> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
> *handlingTime
> - if(null(lag(handlingTime) over (partition by transactionId order by
> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>
> *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
> I have also tried to use the Table Api with a session window like :
> Table t = tupled3DsTable
>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
> )).groupBy($("transactionId"), $("w"))
>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
> "handlingTime").max().over($("w")));
> This gives:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not resolve over call.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> and also :
> Table t = tupled3DsTable
>         .window(Over.partitionby($("transactionId")).orderBy($(
> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
> "originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
> Which does not work since it cannot find lag function :-(
>
> In java I have the following setup:
> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
> WatermarkStrategy
>         .<Tuple3<Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
> String, String>>() {
>             @Override
>             public long extractTimestamp(Tuple3<Long, String, String>
> element, long handlingTime) {
>             return element.f0;
>          }});
>
> DataStream<Tuple3<Long, String, String>> tuple3dswm = 
> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>  "SOURCE_WATERMARK()")
>             .build()).as("handlingTime", "transactionId", "originalEvent");
>
>
>
>
>
>

Reply via email to