Hello all, Let's say I want to hold some state value derived during one transformation, and then use that same state value in a subsequent transformation? For example:
myStream .keyBy(fieldID) // Some field ID, may be 0 .map(new MyStatefulMapper()) .map(new MySubsequentMapper()) .... Now, I define MyStatefulMapper in the usual fashion: public class MyStatefulMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // logic of accessing and updating the ValueState 'sum' above } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "mySum", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); }} So, by now, RuntimeContext has registered a State holder named 'mySum'. In the implementation of 'MySubsequentMapper', I need to access this State holder named 'mySum', perhaps thus (my thinking, I may be wrong): public class MySubsequentMapper extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient ValueState<Tuple2<Long, Long>> aSubsequentSum; private transient ValueState<Tuple2<Long, Long>> sum; // defined earlier @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // logic of accessing and updating the ValueState 'aSubsequentSum' above // but this logic depends on the current contents of ValueState 'sum' created earlier } @Override public void open(Configuration config) { // Logic to create ValueDescriptor for 'aSubsequentSum' which is owned by this operator // ... // Question: now, how do I prepare for accessing 'sum' which is a State holder, but created inside an earlier operator? sum = getRuntimeContext().getState(descriptor) // how can I pass the name 'mySum' (used in StateDescriptor)? }} I have two questions: 1) What I am trying to achieve: is that possible and even, advisable? If not, then what is the alternative? 2) Is there a guarantee that Flink will execute MyStatefulOperator.open() always before MySubsequentOperator.open() because of the lexical order of appearance in the source code? -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."