Hi,
Ńo worries :) You probably need to write your own process function to do
exactly that, maybe something like this:
DataStream<Tuple2<Long, Double>> test;
DataStream<Tuple3<Long, Double, Double>> max = test.keyBy(0)
.process(new KeyedProcessFunction<Tuple, Tuple2<Long, Double>,
Tuple3<Long, Double, Double>>() {
public ValueState<Double> max;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Double> descriptor =
new ValueStateDescriptor<>("max", TypeInformation.of(new
TypeHint<Double>() {
}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<Long, Double> value, Context ctx,
Collector<Tuple3<Long, Double, Double>> out) throws Exception {
// ...
}
});
You need to store max on the state if you care about recovering from
failures/restarts without loosing previous max value. Please check the online
documentation for ProcessFunction and handling state in Flink :)
Piotrek
> On 6 Jun 2018, at 15:55, Nicholas Walton <[email protected]> wrote:
>
> I’m sure I’m being a complete idiot, since this seems so trivial but if
> someone could point me in the right direction I’d be very grateful.
>
> I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate
> the running max of the stream no problem using “.max(2)”. But I want to
> output the original input together with the running max value as [(Int,
> Double, Double)]. I’ve hunted high and low for a means to do something so
> trivial.
>
> Nick Walton