Hi Shahar, The table function takes a single row but can output multi rows. You can split the row based on the "last" event. The code looks like:
val sessionResult = > "SELECT " + > " lastUDAF(line) AS lastEvents " > "FROM MyTable " + > "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)" > val result = > s"SELECT lastEvent FROM ($sessionResult), LATERAL > TABLE(splitUDTF(lastEvents)) > as T(lastEvent)" The lastUDAF is used to process data in a session window. As your lastEvent is base on either window end or a special "last" event, the lastUDAF outputs multi last events. After the window, we perform a splitUDTF to split the lastEvents to multi single events. Best, Hequn On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Im wondering how does that work, it seems that a table function still > takes a single row's values as an input, am i wrong (or at least that is > how the examples show)? > How would the SQL look like? > > On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi shkob1, >> >> > while one is time(session inactivity) the other is based on a specific >> event marked as a "last" event. >> How about using a session window and an udtf[1] to solve the problem. The >> session window may output multi `last` elements. However, we can use a udtf >> to split them into single ones. Thus, we can use SQL for the whole job. >> >> Best, Hequn. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions >> >> On Sat, Oct 13, 2018 at 2:28 AM shkob1 <shahar.kobrin...@gmail.com> >> wrote: >> >>> Hey! >>> >>> I have a use case in which im grouping a stream by session id - so far >>> pretty standard, note that i need to do it through SQL and not by the >>> table >>> api. >>> In my use case i have 2 trigger conditions though - while one is time >>> (session inactivity) the other is based on a specific event marked as a >>> "last" event. >>> AFAIK SQL does not support custom triggers - so what i end up doing is >>> doing >>> group by in the SQL - then converting the result to a stream along with a >>> boolean field that marks whether at least one of the events was the end >>> event - then adding my custom trigger on top of it. >>> It looks something like this: >>> >>> Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), >>> sessionId, count(*) FROM source Group By sessionId"); >>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig) >>> .filter(tuple -> tuple.f0) >>> .map(...) >>> .returns(...) >>> .keyBy("sessionId") >>> .window(EventTimeSessionWindows.withGap(Time.hours(4))) >>> .trigger(new SessionEndedByTimeOrEndTrigger()) >>> .process(...take last element from the group by result..) >>> >>> This seems like a weird work around to, isn't it? my window is basically >>> of >>> the SQL result rather than on the source stream. Ideally i would keyby >>> the >>> sessionId before running the SQL but then a) would I need to register a >>> table per key? b) would i be able to use the custom trigger per window? >>> >>> basically i want to group by session id and have a window for every >>> session >>> that supports both time and custom trigger. Assuming i need to use SQL >>> (reason is the query is dynamically loaded), is there a better solution >>> for >>> it? >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>