Hi Aljoscha, Yes, the same user ID can originate from different sources. You are right, it would not be possible to guarantee ordering if you consider user IDs cross the sources. However, when you key by source ID we isolate all the user IDs within each source ID. So I believe it should be fine. At least I get the results I expect.
Best, Samir 2016-11-09 13:43 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > Hi Samir, > can events with the same user ID originate from different sources? If yes, > then doing things based on changes in the user idea are problematic because > there are no ordering guarantees. > > Cheers, > Aljoscha > > On Tue, 8 Nov 2016 at 19:59 Samir Abdou <abdou.samir.mail...@gmail.com> > wrote: > >> Hi Aljoscha, >> >> Thanks for the question. >> >> I key by source ID, because I want to isolate users per source. If I >> would key by User ID, I would need to have a logic to create sessions based >> on time. But I would like to create my sessions based on user ID changes in >> the events stream for each source. >> >> Cheers, >> Samir >> >> 2016-11-07 18:04 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >> >> Hi, >> why are you keying by the source ID and not by the user ID? >> >> Cheers, >> Aljoscha >> >> On Mon, 7 Nov 2016 at 15:42 Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Samir, >> >> the windowing API in Flink works the following way: First an incoming >> element is assigned to a window. This is defined in the window clause where >> you create a GlobalWindow. Thus, all elements for the same sourceId will be >> assigned to the same window. Next, the element is given to a Trigger which >> decides whether the window shall be evaluated or not. But at this point the >> element is already part of the window. That's why the last element of your >> window has a different ID. >> >> What you could try to use is the MergingWindowAssigner to create windows >> whose elements all have the same ID. There you assign all elements with the >> same ID to the same session window. The session windows are then triggered >> by event time, for example. That's the recommended way to create session >> windows with Flink. Here is some documentation for session windows [1]. >> >> [1] https://ci.apache.org/projects/flink/flink-docs- >> master/dev/windows.html#session-windows >> >> Cheers, >> Till >> >> On Sun, Nov 6, 2016 at 12:11 PM, Samir Abdou < >> abdou.samir.mail...@gmail.com> wrote: >> >> I am using Flink 1.2-Snapshot. My data looks like the following: >> >> - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 >> - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920 >> - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944 >> - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, >> value=944 >> - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 >> - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955 >> - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955 >> - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960 >> - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 >> >> I am running the following code to create windows based user IDs: >> >> stream.flatMap(new LogsParser()) >> .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) >> .keyBy("sourceId") >> .window(GlobalWindows.create()) >> .trigger(PurgingTrigger.of(new MySessionTrigger())) >> .apply(new SessionWindowFunction()) >> .print(); >> >> MySession trigger looks into the received event and check the user ID to >> trigger the window on user ID changes. The SessionWindowFunction just >> create a session out of the window. >> >> Here are the sessions created: >> >> 1. >> >> Session: >> - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 >> - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, >> value=920 >> - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, >> value=944 >> - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, >> value=944 >> 2. >> >> Session: >> - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 >> - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, >> value=955 >> - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, >> value=955 >> - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, >> value=960 >> 3. >> >> Session: >> - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 >> >> The problem as you can see is that in every session the last event >> belongs actually to the next window. The decision to trigger the window is >> somehow late as the last event is already in the window. >> >> How can I trigger the window without considering the last event in that >> window? >> >> Thanks for your help. >> >> >> >>