
can anyone help me with this problem? I don't get it. Forget the examples
below, I've created a copy / paste example to reproduce the problem of
incorrect results when using key-value state und windowOperator.

public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =


DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)

Long>>(Time.seconds(0)) {
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
.map(new MapTest())
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>()
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
sb.append("Set " +set.toString());

// execute program
env.execute("Flink Streaming Java API Skeleton");

private static class MapTest extends
RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
throws Exception {

Tuple2<String,Long> t = state.value();


if(t == null) return stringLongTuple2;

return t;

public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),

state = getRuntimeContext().getState(vsd);


Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]


BTW ... I am using Flink 1.1.3.

2017-01-16 12:18 GMT+01:00

Hi Aljoscha,
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the assignTimestampsAndWaterma
> rks() is called after the connector to Kafka is generated:
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new 
> TimestampGenerator(Time.seconds(0)));
> In the map function I try to calculate the direction between two GPS data 
> points. For this, I store the last event in ValueState. The function looks 
> like this:
> private static class BearingMap extends RichMapFunction<Car, Car> {
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>    @Override
>    public Car map(Car destination) throws Exception {
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event 
> nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>            System.out.println("Differenz: " +diff);
>            if(Math.abs(diff) <= maxdiff*1000){
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>          double bearing = Helper.calculateBearing(
> origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>          // Update des State's
>          state.update(destination);
>          origin.setDirection(bearing);
>          return origin;
>       }
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne 
> Richtung zurück
>          return origin;
>    }
>    @Override
>    public void open(Configuration parameters) throws Exception {
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>       state = getRuntimeContext().getState(vsd);
>    }
> }
> Together with the window function:
> private static class TimeWindowTest implements WindowFunction<Car, 
> Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, 
> List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> 
> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, 
> Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + 
> timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
> I get for :
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
> So actually, when an event e1 arrives the Map Operator, it is stored in 
> ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following 
> outcome. One Element is always around 5 seconds before the
> start of the window.
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
Best,
Nico
> Nico
2017-01-09 16:10 GMT+01:00
Hi,
>> I'm assuming you also have the call to assignTimestampsAndWatermarks()
>> somewhere in there as well, as in:
>> stream
>>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
>> somewhere else in the pipeline
>>       .keyBy("id")
>>       .map(...)
>>       .filter(...)
>>       .map(...)
>>       .keyBy("areaID")
>>       .map(new KeyExtractor())
>>       .keyBy("f1.areaID","f0.sinterval")
>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>       .apply(new TrafficInformation());
>> Just checking, to make sure. If you have this we might have to dig a
>> little deeper. Could you also please trying to bring the whole output of
>> your apply() method in one go, i.e. collect all the output in a String and
>> then have one call to System.out.println(), it could be that the output in
>> the terminal is not completely in order.
>> Cheers,
>> Aljoscha
On Mon, 2 Jan 2017 at 15:04
Hi Aljoscha,
>>> thank you for having a look. Actually there is not too much code based
>>> on timestamps:
>>> stream
>>>       .keyBy("id")
>>>       .map(...)
>>>       .filter(...)
>>>       .map(...)
>>>       .keyBy("areaID")
>>>       .map(new KeyExtractor())
>>>       .keyBy("f1.areaID","f0.sinterval")
>>>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>       .apply(new TrafficInformation());
>>> The map functions only enrich the data and don't change anything related
>>> to the timestamp.
>>> the apply function is:
>>> @Override
>>> public void apply(
>>> Tuple key,
>>> TimeWindow timeWindow,
>>> Iterable<Tuple2<DirectionInterval, Car>> cars,
>>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>>> System.out.println("Start: " +timeWindow.getStart());
>>> System.out.println("End: " +timeWindow.getEnd());
>>> for(Tuple2<DirectionInterval, Car> t : cars){
>>> System.out.println(t.f1);
>>> }
>>> System.out.println(t.f1) prints all information about a car, in which
>>> the timestep is embedded. The System gets the timestamp with the class:
>>> public class TimestampGenerator extends 
>>> BoundedOutOfOrdernessTimestampExtractor
>>> <Car> {
>>>     public TimestampGenerator(Time maxOutOfOrderness){
>>>         super(maxOutOfOrderness);
>>>     }
>>>     @Override
>>>     public long extractTimestamp(Car car) {
>>>         return car.getTimestamp();
>>>     }
>>> Example output is presented in the previous post... it looks like the
>>> timestamp is rounded... I am confused :-/
Best,
Nico
>>> Nico
2016-12-23 19:41 GMT+01:00
Hi,
>>> could you please share code (and example data) for producing this
>>> output. I'd like to have a look.
Cheers,
Aljoscha
>>> Aljoscha
On Wed, 21 Dec 2016 at 16:29
Hi @all,
>>> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
>>> During this I found a strange behavior (at least for me) in the assignment
>>> of events.
>>> The first element of a new window is actually always part of the old
>>> window. I thought the events are late, but then they they would be dropped
>>> instead of assigned to the new window. Even with a allowedLateness of 10s
>>> the behavior remains the same.
>>> The used timeWindow.getStart() and getEnd in order to get the boundaries
>>> of the window.
>>> Can someone explain this?
Best,
Nico
>>> Nico
>>> TimeWindows with Elements:
>>> Start: 1482332940000 - End: 1482332960000
>>> timestamp=1482332952907
>>> Start: 1482332960000 - End: 1482332980000
>>> timestamp=1482332958929
>>> timestamp=1482332963995
>>> timestamp=1482332969027
>>> timestamp=1482332974039
>>> Start: 1482332980000 - End: 1482333000000
>>> timestamp=1482332979059
>>> timestamp=1482332984072
>>> timestamp=1482332989081
>>> timestamp=1482332994089
>>> Start: 1482333000000 - End: 1482333020000
>>> timestamp=1482332999113
>>> timestamp=1482333004123
>>> timestamp=1482333009132
>>> timestamp=1482333014144

