Yes, that's true.

On Fri, 27 Jan 2017 at 13:16 Nico <nico.franz...@gmail.com> wrote:

> Hi Aljoscha,
>
> got it!!! :) Thank you. So, in order to retain the "original" timestamps,
> it would be necessary to assign the timestemps after the MapFunction
> instead of the kafka source? At lest, this solves the issue in the example.
>
> Best,
> Nico
>
> 2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Now I see. What you're doing in this example is basically reassigning
> timestamps to other elements in your stateful MapFunction. Flink internally
> keeps track of the timestamp of an element. This can normally not be
> changed, except by using a TimestampAssigner, which you're doing. Now, the
> output from a MapFunction has the same timestamp as the input element. By
> keeping an element in state and emitting it when the next element arrives
> you emit it with the timestamp of that next element and that's the reason
> why the end up in the "wrong" windows.
>
> Does that help?
>
> -
> Aljoscha
>
> On Thu, 26 Jan 2017 at 19:17 Nico <nico.franz...@gmail.com> wrote:
>
> Hi,
>
> can anyone help me with this problem? I don't get it. Forget the examples
> below, I've created a copy / paste example to reproduce the problem of
> incorrect results when using key-value state und windowOperator.
>
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream<Tuple2<String,Long>> stream = env.fromElements(
> new Tuple2<>("1",1485446260994L),
> new Tuple2<>("1",1485446266012L),
> new Tuple2<>("1",1485446271031L),
> new Tuple2<>("1",1485446276040L),
> new Tuple2<>("1",1485446281045L),
> new Tuple2<>("1",1485446286049L),
> new Tuple2<>("1",1485446291062L),
> new Tuple2<>("1",1485446296066L),
> new Tuple2<>("1",1485446302019L)
> );
>
> stream
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
> Long>>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
> return stringLongTuple2.f1;
> }
> })
> .keyBy("f0")
> .map(new MapTest())
> .keyBy("f0")
> .window(TumblingEventTimeWindows.of(Time.seconds(20)))
> .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple,
> TimeWindow>() {
> @Override
> public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
> throws Exception {
>
> Set<Long> set = new HashSet<>();
> for(Tuple2<String,Long> t : iterable){
> set.add(t.f1);
> }
>
> StringBuilder sb = new StringBuilder();
>
> sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
> ");
> sb.append("Set " +set.toString());
> System.out.println(sb.toString());
> }
> })
> .print();
>
>
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
> }
>
> private static class MapTest extends
> RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {
>
> private transient ValueState<Tuple2<String, Long>> state;
>
> @Override
> public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
> throws Exception {
>
> Tuple2<String,Long> t = state.value();
>
> state.update(stringLongTuple2);
>
> if(t == null) return stringLongTuple2;
>
> return t;
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
> "lastEvent",
> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
> null
> );
>
> state = getRuntimeContext().getState(vsd);
> }
> }
> }
>
>
> Output:
>
> Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
> 1485446266012]
> Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
> 1485446286049, 1485446276040]
> Window [1485446300000 1485446320000] Set [1485446296066]
>
> Best,
> Nico
>
> BTW ... I am using Flink 1.1.3.
>
>
> 2017-01-16 12:18 GMT+01:00 Nico <nico.franz...@gmail.com>:
>
> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the
> assignTimestampsAndWatermarks() is called after the connector to Kafka is
> generated:
>
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new 
> TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data 
> points. For this, I store the last event in ValueState. The function looks 
> like this:
>
> private static class BearingMap extends RichMapFunction<Car, Car> {
>
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>
>    @Override
>    public Car map(Car destination) throws Exception {
>
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event 
> nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>
>            System.out.println("Differenz: " +diff);
>
>            if(Math.abs(diff) <= maxdiff*1000){
>
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>
>          double bearing = Helper.calculateBearing(
>                
> origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>          // Update des State's
>          state.update(destination);
>
>          origin.setDirection(bearing);
>          return origin;
>
>       }
>
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne 
> Richtung zurück
>          return origin;
>
>    }
>
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>
>       state = getRuntimeContext().getState(vsd);
>    }
>
> }
>
> Together with the window function:
>
>
> private static class TimeWindowTest implements WindowFunction<Car, 
> Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, 
> List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> 
> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, 
> Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + 
> timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
>
> I get for :
>
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
>
> So actually, when an event e1 arrives the Map Operator, it is stored in 
> ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following 
> outcome. One Element is always around 5 seconds before the
> start of the window.
>
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
>
>
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
>
>
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
>
>
> Best,
>
> Nico
>
>
>
> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>
>
>
>

Reply via email to