Hi Jörn! Thanks for your suggestions.
Btw just a correction in the Fish class it's "public Point point" not "public Point coordinate". For double type comparison, I implemented what you suggested and used BigDecimal for their comparison. I'm still getting the same results where I see smaller values after the latest maximum value. I would prefer using the max method but I need to do max on the x-value inside the Point object contained inside the Fish object. (I retrieve it using fish.point.getX();) The max method on keyed stream works by providing the field name in the form of a string keyedStream.max("point") so is there a way I can reference the x coordinate inside the point object to max on? Is there a way I can implement keyedStream.max(fish.point.getX())? About your reduce function: > You execute it by fish_id if I see it correctly. This will create one > result by fish_id . I propose to map first all fish coordinates under a > single key and then reduce by this single key. ^Will try this next. Regards, Komal On Thu, 3 Oct 2019 at 16:42, Jörn Franke <jornfra...@gmail.com> wrote: > Btw. Why don’t you use the max method? > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String- > > See here about the state solution: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html > > About your reduce function: > You execute it by fish_id if I see it correctly. This will create one > result by fish_id . I propose to map first all fish coordinates under a > single key and then reduce by this single key. > > Am 03.10.2019 um 08:26 schrieb Komal Mariam <komal.mar...@gmail.com>: > > > Hello all, > I'm trying to do a fairly simple task that is to find the maximum value > (Double) received so far in a stream. This is what I implemented: > > POJO class: > public class Fish{ > public Integer fish_id; > public Point coordinate; //position > > public Fish() {}; > > public Fish(fish_id,double x, double y) { > //assign to fish object > } > > Java_main.java > DataStream text = env > .addSource(new FlinkKafkaConsumer<>("test", new > JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest()); > > DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, > Fish>() { > @Override > public Fish map(ObjectNode json) throws Exception { > Fish fishes = new Fish( > json.get("value").get("id").asInteger() > ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble()); > return fishes; > } > }); > > > I can't seem to apply aggregations on the Point class without extracting > the x coordinates in a separate stream so here are the 2 methods I have > applied: > > Method 1: Simple reduce > > > KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id); > keyed.reduce(new ReduceFunction<Fish>() { > @Override > public Fish reduce(Fish t, Fish t1) throws Exception { > if (t.X > t1.X) { > return t; > } else > return t1; > } > }).map(new MapFunction<Fish, Double>() { > @Override > public Double map(Fish t) throws Exception { > return t.point.getX(); > } > }).print(); > > > > Result: 1> -73.8517632 > 1> -73.851446 > 1> -73.851446 > 1> -73.8505642 > 1> -73.851446 //smaller than previous value! > 1> -73.851446 > 1> -73.851446 > 1> -73.8505642 > 1> -73.8517632 > 1> -73.851446 > 3> -73.85012 > 3> -73.850212 > 3> -73.851979 //smaller than previous value > 3> -73.850212 > 3> -73.8512969 > 3> -73.8512969 > > *1)* I'm trying to compute the max so why do I see smaller values after > the latest maximum value. I think this is because order of the outputs is > not preserved as same as inputs? > If this is so how do I ensure that the order is preserved and I only see > the latest maximum value? > > *2)* Another thing I have noticed is that if instance 1 produces the max > say -73.8505642 but then instance 2 would produce -73.9064 which is again > smaller than the value produced by instance 1. I'm assuming its because > there is no communication between parallel instances hence they produce two > value. If this is so how do I get ONE maximum value from all the parallel > instances combined? > > Method 2: Using States > > public class GetMaximum extends RichMapFunction<Fish, Fish> { > > private transient ValueState<Fish> max; > > @Override > public Fish map(Fish input) throws Exception { > > // access the state value > Fish currentMaximum = max.value(); > > if (input.point.getX() > currentMaximum.point.getX()) { > currentMaximum.objid = input.objid; > currentMaximum.point = (org.locationtech.jts.geom.Point) > input.point.clone(); > max.update(currentMaximum); > } > return currentMaximum; > } > > @Override > public void open(Configuration config) { > ValueStateDescriptor<Fish> descriptor = > new ValueStateDescriptor<>( > "average", // the state name > TypeInformation.of(new TypeHint<Fish>() {}), // > type information > new Fish(-100,0)); // default value of the state, > if nothing was set > max = getRuntimeContext().getState(descriptor); > } > } > > *3) *I'm getting the same results as method 1. isn't ValueState shared > between all instances of the same operator? > > *4) *Out of the two methodologies which is a better choice? > > Would really appreciate your help! > > Best Regards, > Komal > > > > >