Thanks

Would the option for datastream be to write a MapPartitionFunction?

Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
france...@ververica.com>:

> > Which does not work since it cannot find lag function :-(
>
> lag and over are not supported at the moment with Table, so you need to
> use SQL for that.
>
> > *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
>
> Yes. The code at the end of the mail is correct and should work fine. I
> have just one comment, if you're using this DataStream only to create the
> Table instance, you could also just define the watermark using the Schema
> builder itself, as described here:
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>
> On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote:
>
>> Hello all
>>
>> I need to calculate the difference in time between ordered rows per
>> transactionId. All events should arrive within the timeframe set by the
>> out-of-orderness ( a couple of minutes). Events outside this time should be
>> ignored.
>>
>> In SQL this would be :
>> select transactionId  , handlingTime , *handlingTime -
>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>> as elapsedTime*
>> from table
>>
>> When I code :
>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>> *handlingTime
>> - if(null(lag(handlingTime) over (partition by transactionId order by
>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>
>> *Will this obey the watermark strategy of the original Datastream? (see
>> further below)*
>> I have also tried to use the Table Api with a session window like :
>> Table t = tupled3DsTable
>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
>> )).groupBy($("transactionId"), $("w"))
>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>> "handlingTime").max().over($("w")));
>> This gives:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not resolve over call.
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>
>> and also :
>> Table t = tupled3DsTable
>>         .window(Over.partitionby($("transactionId")).orderBy($(
>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>> Which does not work since it cannot find lag function :-(
>>
>> In java I have the following setup:
>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>> WatermarkStrategy
>>         .<Tuple3<Long, String,
>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>> String, String>>() {
>>             @Override
>>             public long extractTimestamp(Tuple3<Long, String, String>
>> element, long handlingTime) {
>>             return element.f0;
>>          }});
>>
>> DataStream<Tuple3<Long, String, String>> tuple3dswm = 
>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>
>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>  "SOURCE_WATERMARK()")
>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>
>>
>>
>>
>>
>>

Reply via email to