Hi All, I'm trying to create some experiment with rich windowing function and operator state. I modify the streaming stock prices from
https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala I create the simple windowing function like below class MyWindowFunction extends RichWindowMapFunction[StockPricex, StockPricex] { println("created") private var counter = 0 override def open(conf: Configuration): Unit = { println("opened") } override def mapWindow(values: Iterable[StockPricex], out: Collector[StockPricex]): Unit = { // if not initialized .. println(counter) println(values) counter = counter + 1 } } However the open() method is not invoked when i try to run this code on my local environment spx .groupBy(x => x.symbol) .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS)) .mapWindow(new MyWindowFunction) Any thought on this one ? Cheers -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.