Hi Shahar,

The table function takes a single row but can output multi rows. You can
split the row based on the "last" event. The code looks like:

    val sessionResult =
>       "SELECT " +
>         "  lastUDAF(line) AS lastEvents "
>         "FROM MyTable " +
>         "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
>     val result =
>       s"SELECT lastEvent FROM ($sessionResult), LATERAL 
> TABLE(splitUDTF(lastEvents))
> as T(lastEvent)"


The lastUDAF is used to process data in a session window. As your lastEvent
is base on either window end or a special "last" event, the lastUDAF
outputs multi last events.
After the window, we perform a splitUDTF to split the lastEvents to multi
single events.

Best, Hequn


On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Im wondering how does that work, it seems that a table function still
> takes a single row's values as an input, am i wrong (or at least that is
> how the examples show)?
> How would the SQL look like?
>
> On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi shkob1,
>>
>> > while one is time(session inactivity) the other is based on a specific
>> event marked as a "last" event.
>> How about using a session window and an udtf[1] to solve the problem. The
>> session window may output multi `last` elements. However, we can use a udtf
>> to split them into single ones. Thus, we can use SQL for the whole job.
>>
>> Best, Hequn.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>>
>> On Sat, Oct 13, 2018 at 2:28 AM shkob1 <shahar.kobrin...@gmail.com>
>> wrote:
>>
>>> Hey!
>>>
>>> I have a use case in which im grouping a stream by session id - so far
>>> pretty standard, note that i need to do it through SQL and not by the
>>> table
>>> api.
>>> In my use case i have 2 trigger conditions though - while one is time
>>> (session inactivity) the other is based on a specific event marked as a
>>> "last" event.
>>> AFAIK SQL does not support custom triggers - so what i end up doing is
>>> doing
>>> group by in the SQL - then converting the result to a stream along with a
>>> boolean field that marks whether at least one of the events was the end
>>> event - then adding my custom trigger on top of it.
>>> It looks something like this:
>>>
>>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>>> sessionId, count(*) FROM source Group By sessionId");
>>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>>>                 .filter(tuple -> tuple.f0)
>>>                 .map(...)
>>>                 .returns(...)
>>>                 .keyBy("sessionId")
>>>                 .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>>>                 .trigger(new SessionEndedByTimeOrEndTrigger())
>>>                 .process(...take last element from the group by result..)
>>>
>>> This seems like a weird work around to, isn't it? my window is basically
>>> of
>>> the SQL result rather than on the source stream. Ideally i would keyby
>>> the
>>> sessionId before running the SQL but then a) would I need to register a
>>> table per key? b) would i be able to use the custom trigger per window?
>>>
>>> basically i want to group by session id and have a window for every
>>> session
>>> that supports both time and custom trigger. Assuming i need to use SQL
>>> (reason is the query is dynamically loaded), is there a better solution
>>> for
>>> it?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to