Hi Aljoscha,

Thanks for the question.

I key by source ID, because I want to isolate users per source. If I would
key by User ID, I would need to have a logic to create sessions based on
time. But I would like to create my sessions based on user ID changes in
the events stream for each source.

Cheers,
Samir

2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> why are you keying by the source ID and not by the user ID?
>
> Cheers,
> Aljoscha
>
> On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Samir,
>>
>> the windowing API in Flink works the following way: First an incoming
>> element is assigned to a window. This is defined in the window clause where
>> you create a GlobalWindow. Thus, all elements for the same sourceId will be
>> assigned to the same window. Next, the element is given to a Trigger which
>> decides whether the window shall be evaluated or not. But at this point the
>> element is already part of the window. That's why the last element of your
>> window has a different ID.
>>
>> What you could try to use is the MergingWindowAssigner to create windows
>> whose elements all have the same ID. There you assign all elements with the
>> same ID to the same session window. The session windows are then triggered
>> by event time, for example. That's the recommended way to create session
>> windows with Flink. Here is some documentation for session windows [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/dev/windows.html#session-windows
>>
>> Cheers,
>> Till
>>
>> On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou <
>> abdou.samir.mail...@gmail.com> wrote:
>>
>> I am using Flink 1.2-Snapshot. My data looks like the following:
>>
>>    - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
>>    - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
>>    - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
>>    - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149,
>>    value=944
>>    - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
>>    - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
>>    - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
>>    - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
>>    - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
>>
>> I am running the following code to create windows based user IDs:
>>
>>     stream.flatMap(new LogsParser())
>>             .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
>>             .keyBy("sourceId")
>>             .window(GlobalWindows.create())
>>             .trigger(PurgingTrigger.of(new MySessionTrigger()))
>>             .apply(new SessionWindowFunction())
>>             .print();
>>
>> MySession trigger looks into the received event and check the user ID to
>> trigger the window on user ID changes. The SessionWindowFunction just
>> create a session out of the window.
>>
>> Here are the sessions created:
>>
>>    1.
>>
>>    Session:
>>    - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
>>       - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14,
>>       value=920
>>       - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14,
>>       value=944
>>       - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149,
>>       value=944
>>    2.
>>
>>    Session:
>>    - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
>>       - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71,
>>       value=955
>>       - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71,
>>       value=955
>>       - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14,
>>       value=960
>>    3.
>>
>>    Session:
>>    - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
>>
>> The problem as you can see is that in every session the last event
>> belongs actually to the next window. The decision to trigger the window is
>> somehow late as the last event is already in the window.
>>
>> How can I trigger the window without considering the last event in that
>> window?
>>
>> Thanks for your help.
>>
>>
>>

Reply via email to