Hi Hequn

Thanks for the details. I will give it a try. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 17. apr. 2019 kl. 04.09 skrev Hequn Cheng <chenghe...@gmail.com>:
> 
> Hi Lasse,
> 
> > some devices can deliver data days back in time and I would like to have 
> > the results as fast as possible.
> 
> What JingsongLee said is correct.
> 
> However, it's possible to handle your problem with Table API according to 
> your description above. You can use the non-window(or unbounded) aggregate[1].
> The non-window aggregate supports early fire, i.e., output results 
> immediately once there is an update, so you can "have the results as fast as 
> possible". The query looks like:
> 
>  Table res30MinWindows = machineInsights
>       .select("UserActionTime / (30 * 60) as windowId, machineId, 
> machineInsightId, value")
>       .groupBy("windowId, machineId, machineInsightId")
>       .select("machineId, machineInsightId, windowId as wStart, windowId + 
> 1800 as sEnd, value.max as max")
> 
> Only you have to notice is, as non-window aggregate keeps all (result)data in 
> its state, the required state to compute the query result might grow 
> infinitely depending on the type of aggregation and the number of distinct 
> grouping keys. To solve this problem, you can provide a query configuration 
> with a valid retention interval to prevent excessive state size[2]. 
> In your case, I think the valid retention interval would be the max delay 
> interval of your data. 
> 
> Best, Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html
> 
> 
>> On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <lassenederga...@gmail.com> 
>> wrote:
>> Hi
>> 
>> Thanks for the fast reply. Unfortunately it not an option as some devices 
>> can deliver data days back in time and I would like to have the results as 
>> fast as possible. 
>> I have to convert my implementation to use streaming API instead. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <lzljs3620...@aliyun.com>:
>>> 
>>> Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
>>> But you can set rowtime.watermarks.delay of source to slow down the 
>>> watermark clock.
>>> 
>>> ------------------------------------------------------------------
>>> 发件人:Lasse Nedergaard <lassenederga...@gmail.com>
>>> 发送时间:2019年4月16日(星期二) 16:20
>>> 收件人:user <user@flink.apache.org>
>>> 主 题:Is it possible to handle late data when using table API?
>>> 
>>> Hi.
>>> 
>>> I have a simple tumble window working on eventtime like this.
>>> 
>>> Table res30MinWindows = machineInsights
>>>         .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
>>> define window
>>>         .groupBy("machineId, machineInsightId, w") // group by key and 
>>> window
>>>         .select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
>>> value.max as max"); // access window properties and aggregate
>>> As we work with Iot units we don't have 100% control over the eventtime 
>>> reported and therefore need to handle late data to ensure that we don't do 
>>> our calculation wrong.
>>> I would like to know if there is any option in the Table API to get access 
>>> to late data, or my only option is to use Streaming API?
>>> Thanks in advance
>>> Lasse Nedergaard
>>> 
>>> 

Reply via email to