Why do you need MapPartitionFunction?

On Wed, Feb 16, 2022 at 7:02 PM HG <hanspeter.sl...@gmail.com> wrote:

> Thanks
>
> Would the option for datastream be to write a MapPartitionFunction?
>
> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> > Which does not work since it cannot find lag function :-(
>>
>> lag and over are not supported at the moment with Table, so you need to
>> use SQL for that.
>>
>> > *Will this obey the watermark strategy of the original Datastream?
>> (see further below)*
>>
>> Yes. The code at the end of the mail is correct and should work fine. I
>> have just one comment, if you're using this DataStream only to create the
>> Table instance, you could also just define the watermark using the Schema
>> builder itself, as described here:
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>
>> On Wed, Feb 16, 2022 at 2:35 PM HG <hanspeter.sl...@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I need to calculate the difference in time between ordered rows per
>>> transactionId. All events should arrive within the timeframe set by the
>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>> ignored.
>>>
>>> In SQL this would be :
>>> select transactionId  , handlingTime , *handlingTime -
>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>> as elapsedTime*
>>> from table
>>>
>>> When I code :
>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>>> *handlingTime
>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>
>>> *Will this obey the watermark strategy of the original Datastream? (see
>>> further below)*
>>> I have also tried to use the Table Api with a session window like :
>>> Table t = tupled3DsTable
>>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>> "w")).groupBy($("transactionId"), $("w"))
>>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>>> "handlingTime").max().over($("w")));
>>> This gives:
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Could not resolve over call.
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>
>>> and also :
>>> Table t = tupled3DsTable
>>>         .window(Over.partitionby($("transactionId")).orderBy($(
>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>>> Which does not work since it cannot find lag function :-(
>>>
>>> In java I have the following setup:
>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>> WatermarkStrategy
>>>         .<Tuple3<Long, String,
>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>>         .withTimestampAssigner(new 
>>> SerializableTimestampAssigner<Tuple3<Long,
>>> String, String>>() {
>>>             @Override
>>>             public long extractTimestamp(Tuple3<Long, String, String>
>>> element, long handlingTime) {
>>>             return element.f0;
>>>          }});
>>>
>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = 
>>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>
>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>>  "SOURCE_WATERMARK()")
>>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to