Hi Hequn Thanks for the details. I will give it a try.
Med venlig hilsen / Best regards Lasse Nedergaard > Den 17. apr. 2019 kl. 04.09 skrev Hequn Cheng <chenghe...@gmail.com>: > > Hi Lasse, > > > some devices can deliver data days back in time and I would like to have > > the results as fast as possible. > > What JingsongLee said is correct. > > However, it's possible to handle your problem with Table API according to > your description above. You can use the non-window(or unbounded) aggregate[1]. > The non-window aggregate supports early fire, i.e., output results > immediately once there is an update, so you can "have the results as fast as > possible". The query looks like: > > Table res30MinWindows = machineInsights > .select("UserActionTime / (30 * 60) as windowId, machineId, > machineInsightId, value") > .groupBy("windowId, machineId, machineInsightId") > .select("machineId, machineInsightId, windowId as wStart, windowId + > 1800 as sEnd, value.max as max") > > Only you have to notice is, as non-window aggregate keeps all (result)data in > its state, the required state to compute the query result might grow > infinitely depending on the type of aggregation and the number of distinct > grouping keys. To solve this problem, you can provide a query configuration > with a valid retention interval to prevent excessive state size[2]. > In your case, I think the valid retention interval would be the max delay > interval of your data. > > Best, Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html > > >> On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <lassenederga...@gmail.com> >> wrote: >> Hi >> >> Thanks for the fast reply. Unfortunately it not an option as some devices >> can deliver data days back in time and I would like to have the results as >> fast as possible. >> I have to convert my implementation to use streaming API instead. >> >> Med venlig hilsen / Best regards >> Lasse Nedergaard >> >> >>> Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <lzljs3620...@aliyun.com>: >>> >>> Hi @Lasse Nedergaard, Table API don't have allowedLateness api. >>> But you can set rowtime.watermarks.delay of source to slow down the >>> watermark clock. >>> >>> ------------------------------------------------------------------ >>> 发件人:Lasse Nedergaard <lassenederga...@gmail.com> >>> 发送时间:2019年4月16日(星期二) 16:20 >>> 收件人:user <user@flink.apache.org> >>> 主 题:Is it possible to handle late data when using table API? >>> >>> Hi. >>> >>> I have a simple tumble window working on eventtime like this. >>> >>> Table res30MinWindows = machineInsights >>> .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // >>> define window >>> .groupBy("machineId, machineInsightId, w") // group by key and >>> window >>> .select("machineId, machineInsightId, w.start, w.end, w.rowtime, >>> value.max as max"); // access window properties and aggregate >>> As we work with Iot units we don't have 100% control over the eventtime >>> reported and therefore need to handle late data to ensure that we don't do >>> our calculation wrong. >>> I would like to know if there is any option in the Table API to get access >>> to late data, or my only option is to use Streaming API? >>> Thanks in advance >>> Lasse Nedergaard >>> >>>