Hi Aljoscha, Thank you for the detailed design document.
Wouldn't it be ok to allow these new concepts regardless of the time semantics? For Event Time and Ingestion Time "Lateness" and "Accumulating/Discarding" make sense. If the user chooses Processing time then these can be ignored during translation of the StreamGraph (possibly with a warning). Detecting when these concepts make sense should be possible by checking the "Stream Charateristics" of the ExecutionEnvironment or the involved classes (e.g. SlidingProcessingTimeWindows) in the StreamGraph. If the users uses a custom WindowAssigner then the user has to take care that it is used correctly. I don't like the "isEventTime()" method. Even with the additional method, users could return 'true' there although they meant 'false', right? So this does not really solve the problem that it is hard to distinguish Event Time and Processing Time semantics in Flink. Another approach that I could think of is getting rid of 'System.currentTimeMillis()' and only allow to get time via a special interface that WindowAssigners implement. Then we could determine what time is assigned and also verify that it is actually used (in contrast to the isEventTime() method). Would that be an option or would it break the API? Cheers, Max On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > By the way. The way I see to fixing this is extending WindowAssigner with > an "isEventTime()" method and then allow accumulating/lateness in the > WindowOperator only if this is true. > > But it seems a but hacky because it special cases event-time. But then > again, maybe we need to special case it ... > > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi Folks, >> as part of my effort to improve the windowing in Flink [1] I also thought >> about lateness, accumulating/discarding and window cleanup. I have some >> ideas on this but I would love to get feedback from the community as I >> think that these things are important for everyone doing event-time >> windowing on Flink. >> >> The basic problem is this: Some elements can arrive behind the watermark >> if the watermark is not 100 % correct (which it is not, in most cases, I >> would assume). We need to provide API that allows to specify what happens >> when these late elements arrive. There are two main knobs for the user here: >> >> - Allowed Lateness: How late can an element be before it is completely >> ignored, i.e. simply discarded >> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we >> purge the contents or do we keep it around until the watermark passes the >> end of end window plus the allowed lateness? If we keep the window a late >> element will be added to the window and the window will be emitted again. >> If don't keep the window then the late element will essentially trigger >> emission of a one-element window. >> >> This is somewhat straightforward to implement: If accumulating set a timer >> for the end of the window plus the allowed lateness. Cleanup the window >> when that fires (basically). All in event-time with watermarks. >> >> My problem is only this: what should happen if the user specifies some >> allowed lateness and/or accumulating mode but uses processing-time >> windowing. For processing-time windows these don't make sense because >> elements cannot can be late by definition. The problem is, that we cannot >> figure out, by looking at a WindowAssigner or the Windows that it assigns >> to elements whether these windows are in event-time or processing-time >> domain. At the API level this is also not easily visible, since a user >> might have set the "stream-time-characteristic" to event-time but still use >> a processing-time window (plus trigger) in the program. >> >> Any ideas for solving this are extremely welcome. :-) >> >> Cheers, >> Aljoscha >> >> [1] >> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp >>