Hello all

I need to calculate the difference in time between ordered rows per
transactionId. All events should arrive within the timeframe set by the
out-of-orderness ( a couple of minutes). Events outside this time should be
ignored.

In SQL this would be :
select transactionId  , handlingTime , *handlingTime - lag(handlingTime,1)
over (partition by transactionId order by handlingTime) as elapsedTime*
from table

When I code :
Table result = tableEnv.sqlQuery("select transactionId, handlingTime,
*handlingTime
- if(null(lag(handlingTime) over (partition by transactionId order by
handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")

*Will this obey the watermark strategy of the original Datastream? (see
further below)*
I have also tried to use the Table Api with a session window like :
Table t = tupled3DsTable
   .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"))
.groupBy($("transactionId"), $("w"))
   .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
"handlingTime").max().over($("w")));
This gives:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not resolve over call.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

and also :
Table t = tupled3DsTable
        .window(Over.partitionby($("transactionId")).orderBy($(
"handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
"originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
Which does not work since it cannot find lag function :-(

In java I have the following setup:
WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
WatermarkStrategy
        .<Tuple3<Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
        .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
String, String>>() {
            @Override
            public long extractTimestamp(Tuple3<Long, String, String>
element, long handlingTime) {
            return element.f0;
         }});

DataStream<Tuple3<Long, String, String>> tuple3dswm =
tuple3ds.assignTimestampsAndWatermarks(wmstrategy);

Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds,
Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
"SOURCE_WATERMARK()")
            .build()).as("handlingTime", "transactionId", "originalEvent");

Reply via email to