Hi all, I was trying to implement a join similar to what was laid out in the flink forward talk Joining Infinity: Windowless Stream Processing with Flink<https://www.youtube.com/watch?v=UrRQYzux5L0&feature=youtu.be&list=PLDX4T_cnKjD2E_lSDcxOXED59GvXHVKR-> and I have been running to some issues. I am running on 1.2-SNAPSHOT compiled for scala 2.11 and suspect this may be a regression so I am including the dev mailing list. When initializing the value state I receive a null pointer exception:
java.lang.NullPointerException at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109) at com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:593) at java.lang.Thread.run(Thread.java:745) Below is a minimum failing example: import org.apache.flink.api.scala._ import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { val buffer = ArrayBuffer.empty[Long] @transient var state: ValueState[String] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) state = getRuntimeContext.getState(new ValueStateDescriptor[String]("state-descriptor", classOf[String], "")) } override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = { state.update(value) } override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = { buffer += value if (state.value() != "") { for (elem ← buffer) { out.collect((elem, state.value())) } buffer.clear() } } } object StreamingPipeline { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(new MemoryStateBackend()) val pipeline1 = env.generateSequence(0, 1000) val pipeline2 = env.fromElements("even", "odd") pipeline1.connect(pipeline2) .keyBy( elem ⇒ elem % 2 == 0, elem ⇒ elem == "even" ).flatMap(FlatMapper) .print() env.execute("Example") } } I also attempted retrieving the state each time I needed it: import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] { val descriptor = new ValueStateDescriptor[String]("state-descriptor", classOf[String], "") val buffer = ArrayBuffer.empty[Long] override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = { getRuntimeContext.getState(descriptor).update(value) } override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = { buffer += value val state = getRuntimeContext.getState(descriptor) if (state.value() != "") { for (elem ← buffer) { out.collect((elem, state.value())) } buffer.clear() } } } object StreamingPipeline { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(new MemoryStateBackend()) val pipeline1 = env.generateSequence(0, 1000) val pipeline2 = env.fromElements("even", "odd") pipeline1.connect(pipeline2) .keyBy( elem ⇒ elem % 2 == 0, elem ⇒ elem == "even" ).flatMap(FlatMapper) .print() env.execute("Example") } } but this results in this precondition failing<https://github.com/apache/flink/blob/6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java#L88> on updates. Seth Wiesman