Yes, that's true. On Fri, 27 Jan 2017 at 13:16 Nico <nico.franz...@gmail.com> wrote:
> Hi Aljoscha, > > got it!!! :) Thank you. So, in order to retain the "original" timestamps, > it would be necessary to assign the timestemps after the MapFunction > instead of the kafka source? At lest, this solves the issue in the example. > > Best, > Nico > > 2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Now I see. What you're doing in this example is basically reassigning > timestamps to other elements in your stateful MapFunction. Flink internally > keeps track of the timestamp of an element. This can normally not be > changed, except by using a TimestampAssigner, which you're doing. Now, the > output from a MapFunction has the same timestamp as the input element. By > keeping an element in state and emitting it when the next element arrives > you emit it with the timestamp of that next element and that's the reason > why the end up in the "wrong" windows. > > Does that help? > > - > Aljoscha > > On Thu, 26 Jan 2017 at 19:17 Nico <nico.franz...@gmail.com> wrote: > > Hi, > > can anyone help me with this problem? I don't get it. Forget the examples > below, I've created a copy / paste example to reproduce the problem of > incorrect results when using key-value state und windowOperator. > > > public class StreamingJob { > > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<Tuple2<String,Long>> stream = env.fromElements( > new Tuple2<>("1",1485446260994L), > new Tuple2<>("1",1485446266012L), > new Tuple2<>("1",1485446271031L), > new Tuple2<>("1",1485446276040L), > new Tuple2<>("1",1485446281045L), > new Tuple2<>("1",1485446286049L), > new Tuple2<>("1",1485446291062L), > new Tuple2<>("1",1485446296066L), > new Tuple2<>("1",1485446302019L) > ); > > stream > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, > Long>>(Time.seconds(0)) { > @Override > public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) { > return stringLongTuple2.f1; > } > }) > .keyBy("f0") > .map(new MapTest()) > .keyBy("f0") > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, > TimeWindow>() { > @Override > public void apply(Tuple tuple, TimeWindow timeWindow, > Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) > throws Exception { > > Set<Long> set = new HashSet<>(); > for(Tuple2<String,Long> t : iterable){ > set.add(t.f1); > } > > StringBuilder sb = new StringBuilder(); > > sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] > "); > sb.append("Set " +set.toString()); > System.out.println(sb.toString()); > } > }) > .print(); > > > // execute program > env.execute("Flink Streaming Java API Skeleton"); > } > > private static class MapTest extends > RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> { > > private transient ValueState<Tuple2<String, Long>> state; > > @Override > public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) > throws Exception { > > Tuple2<String,Long> t = state.value(); > > state.update(stringLongTuple2); > > if(t == null) return stringLongTuple2; > > return t; > } > > @Override > public void open(Configuration parameters) throws Exception { > > ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>( > "lastEvent", > TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}), > null > ); > > state = getRuntimeContext().getState(vsd); > } > } > } > > > Output: > > Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, > 1485446266012] > Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, > 1485446286049, 1485446276040] > Window [1485446300000 1485446320000] Set [1485446296066] > > Best, > Nico > > BTW ... I am using Flink 1.1.3. > > > 2017-01-16 12:18 GMT+01:00 Nico <nico.franz...@gmail.com>: > > Hi Aljoscha, > > is was able to identify the root cause of the problem. It is my first map > function using the ValueState. But first, the > assignTimestampsAndWatermarks() is called after the connector to Kafka is > generated: > > FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09 = > new FlinkKafkaConsumer09<>("Traffic", new Car(), properties); > > // Extrahieren der Timestamps mit max. Delay von 2s > carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new > TimestampGenerator(Time.seconds(0))); > > In the map function I try to calculate the direction between two GPS data > points. For this, I store the last event in ValueState. The function looks > like this: > > private static class BearingMap extends RichMapFunction<Car, Car> { > > private transient ValueState<Car> state; > private final double maxdiff = 12; // in Sekunden > > @Override > public Car map(Car destination) throws Exception { > > Car origin = state.value(); > double olat, olon, dlat, dlon; > > /** > * Wenn State leer, berechne keine Richtung, sondern speichere Event > nur in State > */ > if (origin == null){ > state.update(destination); > // gebe Car ohne Aenderungen zurueck > return destination; > } > > double diff = origin.getTimestamp()-destination.getTimestamp(); > > System.out.println("Differenz: " +diff); > > if(Math.abs(diff) <= maxdiff*1000){ > > /* > * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen > */ > if(diff > 0){ > Car tmp = destination; > destination = origin; > origin = tmp; > } > > /* > * Car tmp ist immer der Ursprung > */ > > double bearing = Helper.calculateBearing( > > origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon()); > > // Update des State's > state.update(destination); > > origin.setDirection(bearing); > return origin; > > } > > // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne > Richtung zurück > return origin; > > } > > > @Override > public void open(Configuration parameters) throws Exception { > > ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>( > "lastEvent", > Car.class, > null > ); > > state = getRuntimeContext().getState(vsd); > } > > } > > Together with the window function: > > > private static class TimeWindowTest implements WindowFunction<Car, > Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, > List<String>>, Tuple, TimeWindow> { > @Override > public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> > iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, > Double, Integer, List<String>>> collector) throws Exception { > String s = "Zeitfenster: " +timeWindow.getStart() +" - " + > timeWindow.getEnd() +"\n"; > Set<Long> timestamps = new HashSet<Long>(); > > for( Car c : iterable){ > timestamps.add(c.getTimestamp()); > } > > System.out.println( s +timestamps +"\n\n"); > } > } > > I get for : > > stream > .filter(new FilterFunction<Car>() { > @Override > public boolean filter(Car car) throws Exception { > return car.getId().equals("car.330"); > } > }) > .keyBy("id") > .map(new BearingMap()) > .keyBy("id") > .window(TumblingEventTimeWindows.of(Time.seconds(10))) > .apply(new TimeWindowTest()); > > So actually, when an event e1 arrives the Map Operator, it is stored in > ValueState and after the next element e2 arrives, e1 > will be forwarded. This is after 5 seconds. This generates the following > outcome. One Element is always around 5 seconds before the > start of the window. > > Differenz: -5013.0 > Differenz: -5014.0 > Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end) > [1484564686236, 1484564691260] > > > Differenz: -5009.0 > Differenz: -5007.0 > Zeitfenster: 1484564700000 - 1484564710000 > [1484564696273, 1484564701287] > > > Differenz: -5005.0 > Differenz: -5014.0 > Zeitfenster: 1484564710000 - 1484564720000 > [1484564706296, 1484564711303] > > > Best, > > Nico > > > > 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi, > I'm assuming you also have the call to assignTimestampsAndWatermarks() > somewhere in there as well, as in: > > stream > .assignTimestampsAndWatermarks(new TimestampGenerator()) // or > somewhere else in the pipeline > .keyBy("id") > .map(...) > .filter(...) > .map(...) > .keyBy("areaID") > .map(new KeyExtractor()) > .keyBy("f1.areaID","f0.sinterval") > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .apply(new TrafficInformation()); > > Just checking, to make sure. If you have this we might have to dig a > little deeper. Could you also please trying to bring the whole output of > your apply() method in one go, i.e. collect all the output in a String and > then have one call to System.out.println(), it could be that the output in > the terminal is not completely in order. > > Cheers, > Aljoscha > > On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote: > > Hi Aljoscha, > > thank you for having a look. Actually there is not too much code based on > timestamps: > > stream > .keyBy("id") > .map(...) > .filter(...) > .map(...) > .keyBy("areaID") > .map(new KeyExtractor()) > .keyBy("f1.areaID","f0.sinterval") > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .apply(new TrafficInformation()); > > The map functions only enrich the data and don't change anything related > to the timestamp. > > the apply function is: > > @Override > public void apply( > Tuple key, > TimeWindow timeWindow, > Iterable<Tuple2<DirectionInterval, Car>> cars, > Collector<Tuple3<String, Double, Double>> out) throws Exception { > > System.out.println("Start: " +timeWindow.getStart()); > System.out.println("End: " +timeWindow.getEnd()); > > for(Tuple2<DirectionInterval, Car> t : cars){ > System.out.println(t.f1); > } > > System.out.println(t.f1) prints all information about a car, in which the > timestep is embedded. The System gets the timestamp with the class: > > public class TimestampGenerator extends > BoundedOutOfOrdernessTimestampExtractor <Car> { > > > public TimestampGenerator(Time maxOutOfOrderness){ > super(maxOutOfOrderness); > } > > @Override > public long extractTimestamp(Car car) { > return car.getTimestamp(); > } > > > Example output is presented in the previous post... it looks like the > timestamp is rounded... I am confused :-/ > > Best, > Nico > > 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi, > could you please share code (and example data) for producing this output. > I'd like to have a look. > > Cheers, > Aljoscha > > On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote: > > Hi @all, > > I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. > During this I found a strange behavior (at least for me) in the assignment > of events. > > The first element of a new window is actually always part of the old > window. I thought the events are late, but then they they would be dropped > instead of assigned to the new window. Even with a allowedLateness of 10s > the behavior remains the same. > > The used timeWindow.getStart() and getEnd in order to get the boundaries > of the window. > > Can someone explain this? > > Best, > Nico > > > TimeWindows with Elements: > > Start: 1482332940000 - End: 1482332960000 > timestamp=1482332952907 > > Start: 1482332960000 - End: 1482332980000 > timestamp=1482332958929 > timestamp=1482332963995 > timestamp=1482332969027 > timestamp=1482332974039 > > Start: 1482332980000 - End: 1482333000000 > timestamp=1482332979059 > timestamp=1482332984072 > timestamp=1482332989081 > timestamp=1482332994089 > > Start: 1482333000000 - End: 1482333020000 > timestamp=1482332999113 > timestamp=1482333004123 > timestamp=1482333009132 > timestamp=1482333014144 > > > > > >