Thanks! I'm going to study this code closely! Niels
On Tue, Dec 1, 2015 at 2:50 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Niels! > > I have a pretty nice example for you here: > https://github.com/StephanEwen/sessionization > > It keeps only one state and has the structure: > > > (source) --> (window sessions) ---> (real time sink) > | > +--> (15 minute files) > > > The real time sink gets the event with attached visitId immediately. The > session operator, as a side effect, writes out the 15 minute files with > sessions that expired in that time. > > > It is not a lot of code, the two main parts are > > - the program and the program skeleton: > https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/EventTimeSessionization.java > - the sessionizing and file writing operator: > https://github.com/StephanEwen/sessionization/blob/master/src/main/java/com/dataartisans/streaming/sessionization/SessionizingOperator.java > > > The example runs fully on event time, where the timestamps are extracted > from the records. That makes this program very robust (no issue with > clocks, etc). > > Also, here comes the amazing part: The same program should do "replay" and > real time. The only difference is what input you give it. Since time is > event time, it can do both. > > > One note: > - Event Time Watermarks are the mechanism to signal progress in event > time. It is simple here, because I assume that timestamps are ascending in > a Kafka partition. If that is not the case, you need to implement a more > elaborate TimestampExtractor. > > > Hope you can work with this! > > Greetings, > Stephan > > > On Tue, Dec 1, 2015 at 1:00 PM, Stephan Ewen <se...@apache.org> wrote: > >> Just for clarification: The real-time results should also contain the >> visitId, correct? >> >> On Tue, Dec 1, 2015 at 12:06 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Niels! >>> >>> If you want to use the built-in windowing, you probably need two window: >>> - One for ID assignment (that immediately pipes elements through) >>> - One for accumulating session elements, and then piping them into >>> files upon session end. >>> >>> You may be able to use the rolling file sink (roll by 15 minutes) to >>> store the files. >>> That is probably the simplest to implement and will serve the real time >>> case. >>> >>> >>> +--> (real time sink) >>> | >>> (source) --> (window session ids) --+ >>> | >>> +--> (window session) --> (rolling >>> sink) >>> >>> >>> You can put this all into one operator that accumulates the session >>> elements but still immediately emits the new records (the realtime path), >>> if you implement your own windowing/buffering in a custom function. >>> This is also very easy to put onto event time then, which makes it >>> valueable to process the history (replay). For this second case, still >>> prototyping some code for the event time case, give me a bit, I'll get back >>> at you... >>> >>> Greetings, >>> Stephan >>> >>> >>> On Tue, Dec 1, 2015 at 10:55 AM, Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> Hi Stephan, >>>> >>>> I created a first version of the Visit ID assignment like this: >>>> >>>> First I group by sessionid and I create a Window per visit. >>>> The custom Trigger for this window does a 'FIRE' after each element and >>>> sets an EventTimer on the 'next possible moment the visit can expire'. >>>> To avoid getting 'all events' in the visit after every 'FIRE' I'm using >>>> CountEvictor.of(1). >>>> When the visit expires I do a PURGE. So if there are more events >>>> afterwards for the same sessionId I get a new visit (which is exactly what >>>> I want). >>>> >>>> The last step I do is I want to have a 'normal' DataStream again to >>>> work with. >>>> I created this WindowFunction to map the Window stream back to normal >>>> DataStream >>>> Essentially I do this: >>>> >>>> DataStream<Foo> visitDataStream = visitWindowedStream.apply(new >>>> WindowToStream<Foo>()) >>>> >>>> // This is an identity 'apply' >>>> private static class WindowToStream<T> implements WindowFunction<T, T, >>>> String, GlobalWindow> { >>>> @Override >>>> public void apply(String s, GlobalWindow window, Iterable<T> >>>> values, Collector<T> out) throws Exception { >>>> for (T value: values) { >>>> out.collect(value); >>>> } >>>> } >>>> } >>>> >>>> >>>> The problem with this is that I first create the visitIds in a Window >>>> (great). >>>> Because I really need to have both the Windowed events AND the near >>>> realtime version I currently break down the Window to get the single events >>>> and after that I have to recreate the same Window again. >>>> >>>> I'm looking forward to the implementation direction you are referring >>>> to. I hope you have a better way of doing this. >>>> >>>> Niels Basjes >>>> >>>> >>>> On Mon, Nov 30, 2015 at 9:29 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hi Niels! >>>>> >>>>> Nice use case that you have! >>>>> I think you can solve this super nicely with Flink, such that "replay" >>>>> and "realtime" are literally the same program - they differ only in >>>>> whether >>>>> >>>>> Event time is, like you said, the key thing for "replay". Event time >>>>> depends on the progress in the timestamps of the data, so it can progress >>>>> at different speeds, depending on what the rate of your stream is. >>>>> With the appropriate data source, it will progress very fast in >>>>> "replay mode", so that you replay in "fast forward speed", and it >>>>> progresses at the same speed as processing time when you attach to the end >>>>> of the Kafka queue. >>>>> >>>>> When you define the time intervals in your program to react to event >>>>> time progress, then you will compute the right sessionization in both >>>>> replay and real time settings. >>>>> >>>>> I am writing a little example code to share. The type of ID-assignment >>>>> sessions you want to do need an undocumented API right now, so I'll >>>>> prepare >>>>> something there for you... >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes <ni...@basjes.nl> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> The sessionid is present in the measurements. It can also be seen as >>>>>> a form of 'browser id'. >>>>>> Most websites use either a 'long lived random value in a cookie' or a >>>>>> 'application session id' for this. >>>>>> >>>>>> So with the id of the browser in hand I have the need to group all >>>>>> events into "periods of activity" which I call a visit. >>>>>> Such a visit is a bounded subset of all events from a single browser. >>>>>> >>>>>> What I need is to add a (sort of) random visit id to the events that >>>>>> becomes 'inactive' after more than X minutes of inactivity. >>>>>> I then want to add this visitid to each event and >>>>>> 1) stream them out in realtime >>>>>> 2) Wait till the visit ends and store the complete visit on disk (I >>>>>> am going for either AVRO or Parquet). >>>>>> >>>>>> I want to create diskfiles with all visits that ended in a specific >>>>>> time period. So essentially >>>>>> "Group by round(<timestamp of last event>, 15 minutes)" >>>>>> >>>>>> >>>>>> Because of the need to be able to 'repair' things I came with the >>>>>> following question: >>>>>> In the Flink API I see the 'process time' (i.e. the actual time of >>>>>> the server) and the 'event time' (i.e. the time when and event was >>>>>> recorded). >>>>>> >>>>>> Now in my case all events are in Kafka (for say 2 weeks). >>>>>> When something goes wrong I want to be able to 'reprocess' everything >>>>>> from the start of the queue. >>>>>> Here the matter of 'event time' becomes a big question for me; In >>>>>> those 'replay' situations the event time will progress at a much higher >>>>>> speed than the normal 1sec/sec. >>>>>> >>>>>> How does this work in Apache Flink? >>>>>> >>>>>> >>>>>> Niels Basjes >>>>>> >>>>>> >>>>>> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hey Niels! >>>>>>> >>>>>>> You may be able to implement this in windows anyways, depending on >>>>>>> your setup. You can definitely implement state with timeout yourself >>>>>>> (using >>>>>>> the more low-level state interface), or you may be able to use custom >>>>>>> windows for that (they can trigger on every element and return elements >>>>>>> immediately, thereby giving you low latency). >>>>>>> >>>>>>> Can you tell me where exactly the session ID comes from? Is that >>>>>>> something that the function with state generates itself? >>>>>>> Depending on that answer, I can outline either the window, or the >>>>>>> custom state way... >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes <ni...@basjes.nl> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks for the explanation. >>>>>>>> I have clickstream data arriving in realtime and I need to assign >>>>>>>> the visitId and stream it out again (with the visitId now begin part >>>>>>>> of the >>>>>>>> record) into Kafka with the lowest possible latency. >>>>>>>> Although the Window feature allows me to group and close the visit >>>>>>>> on a timeout/expire (as shown to me by Aljoscha in a separate email) it >>>>>>>> does make a 'window'. >>>>>>>> >>>>>>>> So (as requested) I created a ticket for such a feature: >>>>>>>> https://issues.apache.org/jira/browse/FLINK-3089 >>>>>>>> >>>>>>>> Niels >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen <se...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Niels! >>>>>>>>> >>>>>>>>> Currently, state is released by setting the value for the key to >>>>>>>>> null. If you are tracking web sessions, you can try and send a "end of >>>>>>>>> session" element that sets the value to null. >>>>>>>>> >>>>>>>>> To be on the safe side, you probably want state that is >>>>>>>>> automatically purged after a while. I would look into using Windows >>>>>>>>> for >>>>>>>>> that. The triggers there are flexible so you can schedule both >>>>>>>>> actions on >>>>>>>>> elements plus cleanup after a certain time delay (clock time or event >>>>>>>>> time). >>>>>>>>> >>>>>>>>> The question about "state expiry" has come a few times. People >>>>>>>>> seem to like working on state directly, but it should clean up >>>>>>>>> automatically. >>>>>>>>> >>>>>>>>> Can you see if your use case fits onto windows, otherwise open a >>>>>>>>> ticket for state expiry? >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes <ni...@basjes.nl> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'm working on a streaming application that ingests clickstream >>>>>>>>>> data. >>>>>>>>>> In a specific part of the flow I need to retain a little bit of >>>>>>>>>> state per visitor (i.e. keyBy(sessionid) ) >>>>>>>>>> >>>>>>>>>> So I'm using the Key/Value state interface (i.e. OperatorState< >>>>>>>>>> MyRecord>) in a map function. >>>>>>>>>> >>>>>>>>>> Now in my application I expect to get a huge number of sessions >>>>>>>>>> per day. >>>>>>>>>> Since these sessionids are 'random' and become unused after the >>>>>>>>>> visitor leaves the website over time the system will have seen >>>>>>>>>> millions of >>>>>>>>>> those sessionids. >>>>>>>>>> >>>>>>>>>> So I was wondering: how are these OperatorStates cleaned? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>>>> >>>>>>>>>> Niels Basjes >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards / Met vriendelijke groeten, >>>>>>>> >>>>>>>> Niels Basjes >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards / Met vriendelijke groeten, >>>>>> >>>>>> Niels Basjes >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >> > -- Best regards / Met vriendelijke groeten, Niels Basjes