Hi All, 

I'm trying to create some experiment with rich windowing function and
operator state. I modify the streaming stock prices from 

https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala

I create the simple windowing function like below

class MyWindowFunction extends RichWindowMapFunction[StockPricex,
StockPricex] {
  println("created")
  private var counter = 0

  override def open(conf: Configuration): Unit = {
    println("opened")
  }

  override def mapWindow(values: Iterable[StockPricex], out:
Collector[StockPricex]): Unit = {
    // if not initialized ..

    println(counter)
    println(values)
    counter = counter + 1

  }
}

However the open() method is not invoked when i try to run this code on my
local environment

    spx
      .groupBy(x => x.symbol)
      .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1,
TimeUnit.SECONDS))
      .mapWindow(new MyWindowFunction)

Any thought on this one ?


Cheers



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Open-method-is-not-called-with-custom-implementation-RichWindowMapFunction-tp1924.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to