Hi Aljoscha, Thanks for the question.
I key by source ID, because I want to isolate users per source. If I would key by User ID, I would need to have a logic to create sessions based on time. But I would like to create my sessions based on user ID changes in the events stream for each source. Cheers, Samir 2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > why are you keying by the source ID and not by the user ID? > > Cheers, > Aljoscha > > On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Samir, >> >> the windowing API in Flink works the following way: First an incoming >> element is assigned to a window. This is defined in the window clause where >> you create a GlobalWindow. Thus, all elements for the same sourceId will be >> assigned to the same window. Next, the element is given to a Trigger which >> decides whether the window shall be evaluated or not. But at this point the >> element is already part of the window. That's why the last element of your >> window has a different ID. >> >> What you could try to use is the MergingWindowAssigner to create windows >> whose elements all have the same ID. There you assign all elements with the >> same ID to the same session window. The session windows are then triggered >> by event time, for example. That's the recommended way to create session >> windows with Flink. Here is some documentation for session windows [1]. >> >> [1] https://ci.apache.org/projects/flink/flink-docs- >> master/dev/windows.html#session-windows >> >> Cheers, >> Till >> >> On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou < >> abdou.samir.mail...@gmail.com> wrote: >> >> I am using Flink 1.2-Snapshot. My data looks like the following: >> >> - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 >> - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920 >> - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944 >> - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, >> value=944 >> - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 >> - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955 >> - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955 >> - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960 >> - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 >> >> I am running the following code to create windows based user IDs: >> >> stream.flatMap(new LogsParser()) >> .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) >> .keyBy("sourceId") >> .window(GlobalWindows.create()) >> .trigger(PurgingTrigger.of(new MySessionTrigger())) >> .apply(new SessionWindowFunction()) >> .print(); >> >> MySession trigger looks into the received event and check the user ID to >> trigger the window on user ID changes. The SessionWindowFunction just >> create a session out of the window. >> >> Here are the sessions created: >> >> 1. >> >> Session: >> - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 >> - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, >> value=920 >> - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, >> value=944 >> - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, >> value=944 >> 2. >> >> Session: >> - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 >> - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, >> value=955 >> - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, >> value=955 >> - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, >> value=960 >> 3. >> >> Session: >> - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 >> >> The problem as you can see is that in every session the last event >> belongs actually to the next window. The decision to trigger the window is >> somehow late as the last event is already in the window. >> >> How can I trigger the window without considering the last event in that >> window? >> >> Thanks for your help. >> >> >>