Hi, Ńo worries :) You probably need to write your own process function to do exactly that, maybe something like this:
DataStream<Tuple2<Long, Double>> test; DataStream<Tuple3<Long, Double, Double>> max = test.keyBy(0) .process(new KeyedProcessFunction<Tuple, Tuple2<Long, Double>, Tuple3<Long, Double, Double>>() { public ValueState<Double> max; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("max", TypeInformation.of(new TypeHint<Double>() { })); sum = getRuntimeContext().getState(descriptor); } @Override public void processElement(Tuple2<Long, Double> value, Context ctx, Collector<Tuple3<Long, Double, Double>> out) throws Exception { // ... } }); You need to store max on the state if you care about recovering from failures/restarts without loosing previous max value. Please check the online documentation for ProcessFunction and handling state in Flink :) Piotrek > On 6 Jun 2018, at 15:55, Nicholas Walton <nwal...@me.com> wrote: > > I’m sure I’m being a complete idiot, since this seems so trivial but if > someone could point me in the right direction I’d be very grateful. > > I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate > the running max of the stream no problem using “.max(2)”. But I want to > output the original input together with the running max value as [(Int, > Double, Double)]. I’ve hunted high and low for a means to do something so > trivial. > > Nick Walton