Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks()
somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little
deeper. Could you also please trying to bring the whole output of your
apply() method in one go, i.e. collect all the output in a String and then
have one call to System.out.println(), it could be that the output in the
terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote:

> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>

Reply via email to