Hi, can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator.
public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple2<String,Long>> stream = env.fromElements( new Tuple2<>("1",1485446260994L), new Tuple2<>("1",1485446266012L), new Tuple2<>("1",1485446271031L), new Tuple2<>("1",1485446276040L), new Tuple2<>("1",1485446281045L), new Tuple2<>("1",1485446286049L), new Tuple2<>("1",1485446291062L), new Tuple2<>("1",1485446296066L), new Tuple2<>("1",1485446302019L) ); stream .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) { return stringLongTuple2.f1; } }) .keyBy("f0") .map(new MapTest()) .keyBy("f0") .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception { Set<Long> set = new HashSet<>(); for(Tuple2<String,Long> t : iterable){ set.add(t.f1); } StringBuilder sb = new StringBuilder(); sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] "); sb.append("Set " +set.toString()); System.out.println(sb.toString()); } }) .print(); // execute program env.execute("Flink Streaming Java API Skeleton"); } private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> { private transient ValueState<Tuple2<String, Long>> state; @Override public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception { Tuple2<String,Long> t = state.value(); state.update(stringLongTuple2); if(t == null) return stringLongTuple2; return t; } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>( "lastEvent", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}), null ); state = getRuntimeContext().getState(vsd); } } } Output: Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012] Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040] Window [1485446300000 1485446320000] Set [1485446296066] Best, Nico BTW ... I am using Flink 1.1.3. 2017-01-16 12:18 GMT+01:00 Nico <nico.franz...@gmail.com>: > Hi Aljoscha, > > is was able to identify the root cause of the problem. It is my first map > function using the ValueState. But first, the assignTimestampsAndWaterma > rks() is called after the connector to Kafka is generated: > > FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09 = > new FlinkKafkaConsumer09<>("Traffic", new Car(), properties); > > // Extrahieren der Timestamps mit max. Delay von 2s > carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new > TimestampGenerator(Time.seconds(0))); > > In the map function I try to calculate the direction between two GPS data > points. For this, I store the last event in ValueState. The function looks > like this: > > private static class BearingMap extends RichMapFunction<Car, Car> { > > private transient ValueState<Car> state; > private final double maxdiff = 12; // in Sekunden > > @Override > public Car map(Car destination) throws Exception { > > Car origin = state.value(); > double olat, olon, dlat, dlon; > > /** > * Wenn State leer, berechne keine Richtung, sondern speichere Event > nur in State > */ > if (origin == null){ > state.update(destination); > // gebe Car ohne Aenderungen zurueck > return destination; > } > > double diff = origin.getTimestamp()-destination.getTimestamp(); > > System.out.println("Differenz: " +diff); > > if(Math.abs(diff) <= maxdiff*1000){ > > /* > * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen > */ > if(diff > 0){ > Car tmp = destination; > destination = origin; > origin = tmp; > } > > /* > * Car tmp ist immer der Ursprung > */ > > double bearing = Helper.calculateBearing( > > origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon()); > > // Update des State's > state.update(destination); > > origin.setDirection(bearing); > return origin; > > } > > // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne > Richtung zurück > return origin; > > } > > > @Override > public void open(Configuration parameters) throws Exception { > > ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>( > "lastEvent", > Car.class, > null > ); > > state = getRuntimeContext().getState(vsd); > } > > } > > Together with the window function: > > > private static class TimeWindowTest implements WindowFunction<Car, > Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, > List<String>>, Tuple, TimeWindow> { > @Override > public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> > iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, > Double, Integer, List<String>>> collector) throws Exception { > String s = "Zeitfenster: " +timeWindow.getStart() +" - " + > timeWindow.getEnd() +"\n"; > Set<Long> timestamps = new HashSet<Long>(); > > for( Car c : iterable){ > timestamps.add(c.getTimestamp()); > } > > System.out.println( s +timestamps +"\n\n"); > } > } > > I get for : > > stream > .filter(new FilterFunction<Car>() { > @Override > public boolean filter(Car car) throws Exception { > return car.getId().equals("car.330"); > } > }) > .keyBy("id") > .map(new BearingMap()) > .keyBy("id") > .window(TumblingEventTimeWindows.of(Time.seconds(10))) > .apply(new TimeWindowTest()); > > So actually, when an event e1 arrives the Map Operator, it is stored in > ValueState and after the next element e2 arrives, e1 > will be forwarded. This is after 5 seconds. This generates the following > outcome. One Element is always around 5 seconds before the > start of the window. > > Differenz: -5013.0 > Differenz: -5014.0 > Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end) > [1484564686236, 1484564691260] > > > Differenz: -5009.0 > Differenz: -5007.0 > Zeitfenster: 1484564700000 - 1484564710000 > [1484564696273, 1484564701287] > > > Differenz: -5005.0 > Differenz: -5014.0 > Zeitfenster: 1484564710000 - 1484564720000 > [1484564706296, 1484564711303] > > > Best, > > Nico > > > > 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >> Hi, >> I'm assuming you also have the call to assignTimestampsAndWatermarks() >> somewhere in there as well, as in: >> >> stream >> .assignTimestampsAndWatermarks(new TimestampGenerator()) // or >> somewhere else in the pipeline >> .keyBy("id") >> .map(...) >> .filter(...) >> .map(...) >> .keyBy("areaID") >> .map(new KeyExtractor()) >> .keyBy("f1.areaID","f0.sinterval") >> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >> .apply(new TrafficInformation()); >> >> Just checking, to make sure. If you have this we might have to dig a >> little deeper. Could you also please trying to bring the whole output of >> your apply() method in one go, i.e. collect all the output in a String and >> then have one call to System.out.println(), it could be that the output in >> the terminal is not completely in order. >> >> Cheers, >> Aljoscha >> >> On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote: >> >>> Hi Aljoscha, >>> >>> thank you for having a look. Actually there is not too much code based >>> on timestamps: >>> >>> stream >>> .keyBy("id") >>> .map(...) >>> .filter(...) >>> .map(...) >>> .keyBy("areaID") >>> .map(new KeyExtractor()) >>> .keyBy("f1.areaID","f0.sinterval") >>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>> .apply(new TrafficInformation()); >>> >>> The map functions only enrich the data and don't change anything related >>> to the timestamp. >>> >>> the apply function is: >>> >>> @Override >>> public void apply( >>> Tuple key, >>> TimeWindow timeWindow, >>> Iterable<Tuple2<DirectionInterval, Car>> cars, >>> Collector<Tuple3<String, Double, Double>> out) throws Exception { >>> >>> System.out.println("Start: " +timeWindow.getStart()); >>> System.out.println("End: " +timeWindow.getEnd()); >>> >>> for(Tuple2<DirectionInterval, Car> t : cars){ >>> System.out.println(t.f1); >>> } >>> >>> System.out.println(t.f1) prints all information about a car, in which >>> the timestep is embedded. The System gets the timestamp with the class: >>> >>> public class TimestampGenerator extends >>> BoundedOutOfOrdernessTimestampExtractor >>> <Car> { >>> >>> >>> public TimestampGenerator(Time maxOutOfOrderness){ >>> super(maxOutOfOrderness); >>> } >>> >>> @Override >>> public long extractTimestamp(Car car) { >>> return car.getTimestamp(); >>> } >>> >>> >>> Example output is presented in the previous post... it looks like the >>> timestamp is rounded... I am confused :-/ >>> >>> Best, >>> Nico >>> >>> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >>> >>> Hi, >>> could you please share code (and example data) for producing this >>> output. I'd like to have a look. >>> >>> Cheers, >>> Aljoscha >>> >>> On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote: >>> >>> Hi @all, >>> >>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. >>> During this I found a strange behavior (at least for me) in the assignment >>> of events. >>> >>> The first element of a new window is actually always part of the old >>> window. I thought the events are late, but then they they would be dropped >>> instead of assigned to the new window. Even with a allowedLateness of 10s >>> the behavior remains the same. >>> >>> The used timeWindow.getStart() and getEnd in order to get the boundaries >>> of the window. >>> >>> Can someone explain this? >>> >>> Best, >>> Nico >>> >>> >>> TimeWindows with Elements: >>> >>> Start: 1482332940000 - End: 1482332960000 >>> timestamp=1482332952907 >>> >>> Start: 1482332960000 - End: 1482332980000 >>> timestamp=1482332958929 >>> timestamp=1482332963995 >>> timestamp=1482332969027 >>> timestamp=1482332974039 >>> >>> Start: 1482332980000 - End: 1482333000000 >>> timestamp=1482332979059 >>> timestamp=1482332984072 >>> timestamp=1482332989081 >>> timestamp=1482332994089 >>> >>> Start: 1482333000000 - End: 1482333020000 >>> timestamp=1482332999113 >>> timestamp=1482333004123 >>> timestamp=1482333009132 >>> timestamp=1482333014144 >>> >>> >>> >