Btw. Why don’t you use the max method? https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String-
See here about the state solution: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html About your reduce function: You execute it by fish_id if I see it correctly. This will create one result by fish_id . I propose to map first all fish coordinates under a single key and then reduce by this single key. > Am 03.10.2019 um 08:26 schrieb Komal Mariam <komal.mar...@gmail.com>: > > > Hello all, > I'm trying to do a fairly simple task that is to find the maximum value > (Double) received so far in a stream. This is what I implemented: > > POJO class: > public class Fish{ > public Integer fish_id; > public Point coordinate; //position > > public Fish() {}; > > public Fish(fish_id,double x, double y) { > //assign to fish object > } > > Java_main.java > DataStream text = env > .addSource(new FlinkKafkaConsumer<>("test", new > JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest()); > > DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, Fish>() { > @Override > public Fish map(ObjectNode json) throws Exception { > Fish fishes = new Fish( > json.get("value").get("id").asInteger() > ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble()); > return fishes; > } > }); > > > I can't seem to apply aggregations on the Point class without extracting the > x coordinates in a separate stream so here are the 2 methods I have applied: > > Method 1: Simple reduce > > KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id); > keyed.reduce(new ReduceFunction<Fish>() { > @Override > public Fish reduce(Fish t, Fish t1) throws Exception { > if (t.X > t1.X) { > return t; > } else > return t1; > } > }).map(new MapFunction<Fish, Double>() { > @Override > public Double map(Fish t) throws Exception { > return t.point.getX(); > } > }).print(); > > > Result: 1> -73.8517632 > 1> -73.851446 > 1> -73.851446 > 1> -73.8505642 > 1> -73.851446 //smaller than previous value! > 1> -73.851446 > 1> -73.851446 > 1> -73.8505642 > 1> -73.8517632 > 1> -73.851446 > 3> -73.85012 > 3> -73.850212 > 3> -73.851979 //smaller than previous value > 3> -73.850212 > 3> -73.8512969 > 3> -73.8512969 > > 1) I'm trying to compute the max so why do I see smaller values after the > latest maximum value. I think this is because order of the outputs is not > preserved as same as inputs? > If this is so how do I ensure that the order is preserved and I only see the > latest maximum value? > > 2) Another thing I have noticed is that if instance 1 produces the max say > -73.8505642 but then instance 2 would produce -73.9064 which is again > smaller than the value produced by instance 1. I'm assuming its because there > is no communication between parallel instances hence they produce two value. > If this is so how do I get ONE maximum value from all the parallel instances > combined? > > Method 2: Using States > > public class GetMaximum extends RichMapFunction<Fish, Fish> { > > private transient ValueState<Fish> max; > > @Override > public Fish map(Fish input) throws Exception { > > // access the state value > Fish currentMaximum = max.value(); > > if (input.point.getX() > currentMaximum.point.getX()) { > currentMaximum.objid = input.objid; > currentMaximum.point = (org.locationtech.jts.geom.Point) > input.point.clone(); > max.update(currentMaximum); > } > return currentMaximum; > } > > @Override > public void open(Configuration config) { > ValueStateDescriptor<Fish> descriptor = > new ValueStateDescriptor<>( > "average", // the state name > TypeInformation.of(new TypeHint<Fish>() {}), // type > information > new Fish(-100,0)); // default value of the state, if > nothing was set > max = getRuntimeContext().getState(descriptor); > } > } > > 3) I'm getting the same results as method 1. isn't ValueState shared between > all instances of the same operator? > > 4) Out of the two methodologies which is a better choice? > > Would really appreciate your help! > > Best Regards, > Komal > > > >