Hi,

the problem is this line

> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {

which should use „class" instead of „object". Otherwise, one singleton instance 
of the FlatMapper is used by Flink across multiple operator instances, which 
leads to the whole bunch of exceptions you experience.

Best,
Stefan

> Am 20.10.2016 um 23:22 schrieb Seth Wiesman <swies...@mediamath.com>:
> 
> Hi all, 
>  
> I was trying to implement a join similar to what was laid out in the flink 
> forward talk Joining Infinity: Windowless Stream Processing with Flink 
> <https://www.youtube.com/watch?v=UrRQYzux5L0&feature=youtu.be&list=PLDX4T_cnKjD2E_lSDcxOXED59GvXHVKR->
>  and I have been running to some issues. I am running on 1.2-SNAPSHOT 
> compiled for scala 2.11 and suspect this may be a regression so I am 
> including the dev mailing list. When initializing the value state I receive a 
> null pointer exception: 
>  
> java.lang.NullPointerException
>                 at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
>                 at 
> com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21)
>                 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154)
>                 at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:593)
>                 at java.lang.Thread.run(Thread.java:745)
>  
>  
> Below is a minimum failing example:
>  
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.runtime.state.memory.MemoryStateBackend
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.util.Collector
> 
> import scala.collection.mutable.ArrayBuffer
> 
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
> 
>   val buffer = ArrayBuffer.empty[Long]
> 
>   @transient var state: ValueState[String] = _
> 
>   override def open(parameters: Configuration): Unit = {
>     super.open(parameters)
>     state = getRuntimeContext.getState(new 
> ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
>   }
> 
>   override def flatMap2(value: String, out: Collector[(Long, String)]): Unit 
> = {
>     state.update(value)
>   }
> 
>   override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
>     buffer += value
> 
>     if (state.value() != "") {
>       for (elem ← buffer) {
>         out.collect((elem, state.value()))
>       }
> 
>       buffer.clear()
>     }
>   }
> }
> 
> object StreamingPipeline {
> 
>   def main(args: Array[String]): Unit = {
> 
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>     env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.setStateBackend(new MemoryStateBackend())
> 
>     val pipeline1 = env.generateSequence(0, 1000)
> 
>     val pipeline2 = env.fromElements("even", "odd")
> 
>     pipeline1.connect(pipeline2)
>       .keyBy(
>         elem ⇒ elem % 2 == 0,
>         elem ⇒ elem == "even"
>       ).flatMap(FlatMapper)
>       .print()
> 
>     env.execute("Example")
>   }
> }
> I also attempted retrieving the state each time I needed it:
>  
> import org.apache.flink.api.common.state.ValueStateDescriptor
> import org.apache.flink.api.scala._
> import org.apache.flink.runtime.state.memory.MemoryStateBackend
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
> import org.apache.flink.util.Collector
> 
> import scala.collection.mutable.ArrayBuffer
> 
> object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] 
> {
>   val descriptor = new ValueStateDescriptor[String]("state-descriptor", 
> classOf[String], "")
> 
>   val buffer = ArrayBuffer.empty[Long]
> 
>   override def flatMap2(value: String, out: Collector[(Long, String)]): Unit 
> = {
>     getRuntimeContext.getState(descriptor).update(value)
>   }
> 
>   override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
>     buffer += value
> 
>     val state = getRuntimeContext.getState(descriptor)
>     
>     if (state.value() != "") {
>       for (elem ← buffer) {
>         out.collect((elem, state.value()))
>       }
> 
>       buffer.clear()
>     }
>   }
> }
> 
> object StreamingPipeline {
> 
>   def main(args: Array[String]): Unit = {
> 
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>     env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.setStateBackend(new MemoryStateBackend())
> 
>     val pipeline1 = env.generateSequence(0, 1000)
> 
>     val pipeline2 = env.fromElements("even", "odd")
> 
>     pipeline1.connect(pipeline2)
>       .keyBy(
>         elem ⇒ elem % 2 == 0,
>         elem ⇒ elem == "even"
>       ).flatMap(FlatMapper)
>       .print()
> 
>     env.execute("Example")
>   }
> 
> }
>  
> but this results in this precondition failing 
> <https://github.com/apache/flink/blob/6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java#L88>
>  on updates. 
>  
> Seth Wiesman

Reply via email to