I think I understand. Since the entrie session must fit into memory anyway I'm going to try to create a new datastructure which simply contains the 'entire session' and simply use a Window/BucketingSink construct to ship them to files. I do need to ensure noone can OOM the system by capping the maximum number of events in a session to a value that is only reached by robots (for which the whole idea os sessions is bogus anyway).
Thanks for the tips. Niels On Tue, Jul 4, 2017 at 12:07 PM, Fabian Hueske <fhue...@gmail.com> wrote: > You are right. Building a large record might result in an OOME. > But I think, that would also happen with a regular SessionWindow, > RocksDBStatebackend, and a WindowFunction that immediately ships the > records it receives from the Iterable. > As far as I know, a SessionWindow stores all elements in an internal a > ListState. When before iterating over a ListState, the RocksDBStateBackend > completely deserializes the list into an ArrayList, i.e., all records of a > session would be on the heap already. > > If this is a situation that you have to avoid, I think the only way to do > this is to implement the sessionization yourself with a ProcessFunction and > MapState (keyed on timestamp). This would be a non-trivial task though. > > In case the SessionWindow + WindowFunction is OK for you (or you > implemented your own sessionization logic, e.g., in a ProcessFunction), you > could just forward the elements to a modified BucketingSink. > Your version of the BucketingSink would need to ensure that files are not > closed between checkpoints, i.e., only when a checkpoint barrier is > received, it may be closed. Right now, files are closed on timer and/or > file size. > Since all records of a session are emitted by a single WIndowFunction > call, these records won't be interrupted by a barrier. Hence, you'll have a > "consistent" state for all windows when a checkpoint is triggered. > > I'm afraid, I'm not aware of a simpler solution for this use case. > > Hope it helps, Fabian > > 2017-07-04 11:24 GMT+02:00 Niels Basjes <ni...@basjes.nl>: > >> Hi Fabian, >> >> On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> If I understand your use case correctly, you'd like to hold back all >>> events of a session until it ends/timesout and then write all events out. >>> So, instead of aggregating per session (the common use case), you'd just >>> like to collect the event. >>> >> >> Yes, and I want to write the completed sessions into files. No >> aggregation or filtering at all. >> The idea is that our DataScience guys who want to analyze sessions have a >> much easier task of knowing for certain that they have 'a set of complete >> sessions'. >> >> >>> I would implement a simple WindowFunction that just forwards all events >>> that it receives from the iterator. Conceptually, the window will just >>> collect the events and emit them when the session ended/timedout. >>> Then you can add BucketingSink which writes out the events. I'm not sure >>> if the BucketingSInk supports buckets based on event-time though. Maybe you >>> would need to adapt it a bit to guarantee that all rows of the same session >>> are written to the same file. >>> Alternatively, the WindowFunction could also emit one large record which >>> is a List or Array of events belonging to the same session. >>> >> >> That last one was the idea I had. >> Have a window function that keeps the Window until finished, then output >> that with the eventtime of the 'end of the session' and use the bucketing >> sink to write those to disk. >> >> The problem (in my mind) that I have with this is that a single session >> with a LOT of events would bring the system to a halt because it can >> trigger OOM events. >> >> How should I handle those? >> >> Niels >> > > -- Best regards / Met vriendelijke groeten, Niels Basjes