Hi Martin, first of all, sorry for taking so long to write a response. This is valuable feedback that is quite helpful to us.
I don't see, however, how this is not related to our current work of improving the windowing system. If I understood it correctly all of your issues should be addressed by our upcoming work. We are not just working on the internal implementation but also on making the API more streamlined and intuitive. If everything goes according to plan we should have a first version of the new API and internal operators in the master by the end of the week. Stephan already opened his first pull request for an optimized processing-time windowing operator. I'm working on the general windowing operator that supports other policies than time. Once this is in we will focus on the API, so there we will take your feedback into account. :D Cheers, Aljoscha On Thu, 17 Sep 2015 at 12:20 Martin Neumann <mneum...@sics.se> wrote: > After some work experience with the current solution I want to give some > feedback and maybe start a discussion about event time in streaming. This > is not about watermarks or any of the incoming improvements just some > observations from the current code. > > > *Starttime for EventTime:* > > In the current implementation you can specify a start time if you don't it > defaults to 0. > The default is not feasible when using the typical milliseconds since 1970. > The *TimeTriggerPolicy* has the following implementation of > *preNotifyTrigger*: > > @Override > > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > > LinkedList<Object> fakeElements = new LinkedList<Object>(); > > // check if there is more then one window border missed > > // use > here. In case >= would fit, the regular call will do the job. > > while (timestampWrapper.getTimestamp(datapoint) >= startTime + > granularity) > > { > > startTime += granularity; > > fakeElements.add(startTime - 1); > > } > > return (Object[]) fakeElements.toArray(); > > } > > > In practice this means using the default starttime will crash the program > (running our of memory) since it will create fake elements to close every > possible window since 1970. > So you need to set a starttime to make it run which is not that simple. In > production you could use the systemtime to initialize, but this might lead > to some problems when consuming events from e.g. Kafka with an older > timestamp. When debugging using old streams you need to know the lowest > timestamp of the stream to initialize. > > What is the purpose of the fake elements? Is there a way to avoid the > memory problem of creating enormous amounts of empty windows? > Could we just use the timestamp of the first event processed as starttime > instead of having it as a parameter? I testing the following modification > of the above code at the moment, do you see any problem with that? > > @Override > > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > > LinkedList<Object> fakeElements = new LinkedList<Object>(); > > // check if there is more then one window border missed > > // use > here. In case >= would fit, the regular call will do the job. > > // TODO modified here > > if(startTime == 0) startTime = timestampWrapper.getTimestamp(datapoint); > > while (timestampWrapper.getTimestamp(datapoint) >= startTime + > granularity) > > { > > startTime += granularity; > > fakeElements.add(startTime - 1); > > } > > return (Object[]) fakeElements.toArray(); > > } > > > > *EventTime api confusion:* > > I found several ways to use EventTime in my program but I find them not > very intuitive. Compare the two following lines of code both using the > Time.of helper one with event time and one with system time: > > ds.window(Time.of(long windowSize, TimeUnit)) > ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long > startTime)) > > Its weird that you cannot specify the TimeUnit when using the EventTimes > stamp. It would feel more natural if it would look like this (also without > the starttime): > > ds.window(Time.of(long windowSize, TimeUnit, Timestamp > yourTimeStampExtractor)) > > > At the moment I'm using the modified TimeTriggerPolicy direct leading to > this ugly piece of code: > > .window(new TimeTriggerPolicyHack<DataPojo>(100000l, new > TimestampWrapper<DataPojo>(new EventTimeStampExtractor(), 0l)), new > TimeEvictionPolicy<DataPojo>(20000, new TimestampWrapper<DataPojo>(new > EventTimeStampExtractor(), 0l))) > > > > cheers Martin >