Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)? How would the SQL look like?
On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi shkob1, > > > while one is time(session inactivity) the other is based on a specific > event marked as a "last" event. > How about using a session window and an udtf[1] to solve the problem. The > session window may output multi `last` elements. However, we can use a udtf > to split them into single ones. Thus, we can use SQL for the whole job. > > Best, Hequn. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions > > On Sat, Oct 13, 2018 at 2:28 AM shkob1 <shahar.kobrin...@gmail.com> wrote: > >> Hey! >> >> I have a use case in which im grouping a stream by session id - so far >> pretty standard, note that i need to do it through SQL and not by the >> table >> api. >> In my use case i have 2 trigger conditions though - while one is time >> (session inactivity) the other is based on a specific event marked as a >> "last" event. >> AFAIK SQL does not support custom triggers - so what i end up doing is >> doing >> group by in the SQL - then converting the result to a stream along with a >> boolean field that marks whether at least one of the events was the end >> event - then adding my custom trigger on top of it. >> It looks something like this: >> >> Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), >> sessionId, count(*) FROM source Group By sessionId"); >> tableEnv.toRetractStream(result, Row.class, streamQueryConfig) >> .filter(tuple -> tuple.f0) >> .map(...) >> .returns(...) >> .keyBy("sessionId") >> .window(EventTimeSessionWindows.withGap(Time.hours(4))) >> .trigger(new SessionEndedByTimeOrEndTrigger()) >> .process(...take last element from the group by result..) >> >> This seems like a weird work around to, isn't it? my window is basically >> of >> the SQL result rather than on the source stream. Ideally i would keyby the >> sessionId before running the SQL but then a) would I need to register a >> table per key? b) would i be able to use the custom trigger per window? >> >> basically i want to group by session id and have a window for every >> session >> that supports both time and custom trigger. Assuming i need to use SQL >> (reason is the query is dynamically loaded), is there a better solution >> for >> it? >> >> >> >> >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >