Hi Stephen, A window is created with the first record that is assigned to it. If the windows are based on time and a key, than no window will be created (and not space be occupied) if there is not a first record for a key and time interval.
Anyway, if tracking the number of open files & average opening time is your use case, you might want to implement the logic with a ProcessFunction instead of a window. The reason is that it is that time windows don't share state, i.e., the information about an opened but not yet closed file would not be "carried over" to the next window. However, if you use a ProcessFunction, you are responsible for cleaning up the state. Hope this helps, Fabian Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly < stephen.alan.conno...@gmail.com>: > > > On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ches...@apache.org> wrote: > >> This sounds reasonable to me. >> >> I'm a bit confused by this question: "*Additionally, I am (naïevely) >> hoping that if a window has no events for a particular key, the >> memory/storage costs are zero for that key.*" >> >> Are you asking whether a key that was received in window X (as part of an >> event) is still present in window x+1? If so, then the answer is no; a key >> will only be present in a given window if an event was received that fits >> into that window. >> > > To confirm: > > So let's say I'l tracking the average time a file is opened in folders. > > In window N we get the events: > > {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"} > > {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"} > {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"} > {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"} > > So there will be aggregates stored for > ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"), > ("ca:fe:ba:be","/foo/bar/README.txt"), etc > > In window N+1 we do not get any events at all. > > So the memory used by my aggregation functions from window N will be freed > and the storage will be effectively zero (modulo any follow on processing > that might be on a longer window) > > This seems to be what you are saying... in which case my naïeve hope was > not so naïve! w00t! > > >> >> On 08.02.2019 13:21, Stephen Connolly wrote: >> >> Ok, I'll try and map my problem into something that should be familiar to >> most people. >> >> Consider collection of PCs, each of which has a unique ID, e.g. >> ca:fe:ba:be, de:ad:be:ef, etc. >> >> Each PC has a tree of local files. Some of the file paths are >> coincidentally the same names, but there is no file sharing between PCs. >> >> I need to produce metrics about how often files are opened and how long >> they are open for. >> >> I need for every X minute tumbling window not just the cumulative >> averages for each PC, but the averages for each file as well as the >> cumulative averegaes for each folder and their sub-folders. >> >> I have a stream of events like >> >> >> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"} >> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User >> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin >> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User >> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin >> guide.txt","duration":"196"} >> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"} >> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"} >> >> So from that I would like to know stuff like: >> >> ca:fe:ba:be had 4/X opens per minute in the X minute window >> ca:fe:ba:be had 3/X closes per minute in the X minute window and the >> average time open was (67+97+197)/3=120... there is no guarantee that the >> closes will be matched with opens in the same window, which is why I'm only >> tracking them separately >> de:ad:be:ef had 2/X opens per minute in the X minute window >> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window >> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the >> average time open was 120 >> de:ad:be:ef /foo had 1/X opens per minute in the X minute window >> de:ad:be:ef /bar had 1/X opens per minute in the X minute window >> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window >> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window >> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X >> minute window >> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute >> window >> etc >> >> What I think I want to do is turn each event into a series of events with >> different keys, so that >> >> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"} >> >> gets sent under the keys: >> >> ("ca:fe:ba:be","/") >> ("ca:fe:ba:be","/foo") >> ("ca:fe:ba:be","/foo/bar") >> ("ca:fe:ba:be","/foo/bar/README.txt") >> >> Then I could use a window aggregation function to just: >> >> * count the "open" events >> * count the "close" events and sum their duration >> >> Additionally, I am (naïevely) hoping that if a window has no events for a >> particular key, the memory/storage costs are zero for that key. >> >> From what I can see, to achieve what I am trying to do, I could use a >> flatMap followed by a keyBy >> >> In other words I take the events and flat map them based on the path >> split on '/' returning a Tuple of the (to be) key and the event. Then I can >> use keyBy to key based on the Tuple 0. >> >> My ask: >> >> Is the above design a good design? How would you achieve the end game >> better? Do I need to worry about many paths that are accessed rarely and >> would have an accumulator function that stays at 0 unless there are events >> in that window... or are the accumulators for each distinct key eagerly >> purged after each fire trigger. >> >> What gotcha's do I need to look for. >> >> Thanks in advance and appologies for the length >> >> -stephenc >> >> >>