Hi, I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:
stream .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline .keyBy("id") .map(...) .filter(...) .map(...) .keyBy("areaID") .map(new KeyExtractor()) .keyBy("f1.areaID","f0.sinterval") .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new TrafficInformation()); Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order. Cheers, Aljoscha On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote: > Hi Aljoscha, > > thank you for having a look. Actually there is not too much code based on > timestamps: > > stream > .keyBy("id") > .map(...) > .filter(...) > .map(...) > .keyBy("areaID") > .map(new KeyExtractor()) > .keyBy("f1.areaID","f0.sinterval") > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .apply(new TrafficInformation()); > > The map functions only enrich the data and don't change anything related > to the timestamp. > > the apply function is: > > @Override > public void apply( > Tuple key, > TimeWindow timeWindow, > Iterable<Tuple2<DirectionInterval, Car>> cars, > Collector<Tuple3<String, Double, Double>> out) throws Exception { > > System.out.println("Start: " +timeWindow.getStart()); > System.out.println("End: " +timeWindow.getEnd()); > > for(Tuple2<DirectionInterval, Car> t : cars){ > System.out.println(t.f1); > } > > System.out.println(t.f1) prints all information about a car, in which the > timestep is embedded. The System gets the timestamp with the class: > > public class TimestampGenerator extends > BoundedOutOfOrdernessTimestampExtractor <Car> { > > > public TimestampGenerator(Time maxOutOfOrderness){ > super(maxOutOfOrderness); > } > > @Override > public long extractTimestamp(Car car) { > return car.getTimestamp(); > } > > > Example output is presented in the previous post... it looks like the > timestamp is rounded... I am confused :-/ > > Best, > Nico > > 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi, > could you please share code (and example data) for producing this output. > I'd like to have a look. > > Cheers, > Aljoscha > > On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote: > > Hi @all, > > I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. > During this I found a strange behavior (at least for me) in the assignment > of events. > > The first element of a new window is actually always part of the old > window. I thought the events are late, but then they they would be dropped > instead of assigned to the new window. Even with a allowedLateness of 10s > the behavior remains the same. > > The used timeWindow.getStart() and getEnd in order to get the boundaries > of the window. > > Can someone explain this? > > Best, > Nico > > > TimeWindows with Elements: > > Start: 1482332940000 - End: 1482332960000 > timestamp=1482332952907 > > Start: 1482332960000 - End: 1482332980000 > timestamp=1482332958929 > timestamp=1482332963995 > timestamp=1482332969027 > timestamp=1482332974039 > > Start: 1482332980000 - End: 1482333000000 > timestamp=1482332979059 > timestamp=1482332984072 > timestamp=1482332989081 > timestamp=1482332994089 > > Start: 1482333000000 - End: 1482333020000 > timestamp=1482332999113 > timestamp=1482333004123 > timestamp=1482333009132 > timestamp=1482333014144 > > >