Alright, it works perfectly. I checked that my Python methods are properly executed inside RichWindowFunction. Thanks a lot!
p.s. for those who wonder why I use Jep, refer to https://sushant-hiray.me/posts/python-in-scala-stack/ <https://sushant-hiray.me/posts/python-in-scala-stack/> to grasp the idea of using Python inside Java through Jep instead of Jython and JyNI. ------------ class WindowFunction extends RichAllWindowFunction[String, String, GlobalWindow] { var jep: Option[Jep] = None override def open(parameters: Configuration): Unit = { jep = Some(new Jep()) jep map (_.runScript("prediction.py")) } override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = { ... } > 2017. 3. 15. 오전 1:27, Chesnay Schepler <ches...@apache.org> 작성: > > Hey, > > Naturally this would imply that you're script is available on all nodes, so > you will have to distribute it manually. > > On 14.03.2017 17:23, Chesnay Schepler wrote: >> Hello, >> >> I would suggest implementing the RichWindowFunction instead, and instantiate >> Jep within open(), or maybe do some lazy instantiation within apply. >> >> Regards, >> Chesnay >> >> On 14.03.2017 15:47, 김동원 wrote: >>> Hi all, >>> >>> What is the proper way to call a Python function in WindowFunction.apply()? >>> >>> I want to apply a Python function to values in a fixed-side sliding window. >>> I'm trying it because >>> - I'm currently working on time-series prediction using deep learning, >>> which is why I need a sliding window to get the latest N items from the >>> unbound data stream. >>> - I already have a DNN written using Keras on top of Theano (Keras and >>> Theano are Python libraries) in order to exploit Nvidia's CUDA library . >>> - There is no Python DataStream API, so I tried to use Scala DataStream API. >>> - PySpark's structured streaming does not allow me to define UDAF (see a >>> question I posted on stackoverflow about it: >>> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0 >>> >>> <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>) >>> - Spark DStream API does not look promising to this case due to the lack of >>> support in count window. >>> >>> For these reasons, I thoughtlessly wrote a toy example to see the >>> feasibility of applying Python methods to values in the sliding window. >>> -------- >>> import jep.Jep >>> import org.apache.flink.streaming.api.scala._ >>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>> import org.apache.flink.util.Collector >>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction >>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow >>> >>> class WindowFunction extends AllWindowFunction[String, String, >>> GlobalWindow] { >>> val jep = new Jep() >>> jep.runScript("prediction.py") >>> >>> override def apply(window: GlobalWindow, iter: Iterable[String], out: >>> Collector[String]): Unit = { >>> // ... >>> } >>> } >>> >>> object main { >>> def main(args: Array[String]): Unit = { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.socketTextStream("localhost", 9999) >>> .countWindowAll(5, 1) >>> .apply(new WindowFunction()) >>> .print() >>> env.execute() >>> } >>> } >>> -------- >>> >>> Now I'm facing with serializable error with the following error messages: >>> -------- >>> Exception in thread "main" >>> org.apache.flink.api.common.InvalidProgramException: Task not serializable >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666) >>> at >>> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568) >>> at >>> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315) >>> at main$.main(main.scala:23) >>> at main.main(main.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) >>> Caused by: java.io.NotSerializableException: jep.Jep >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317) >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170) >>> ... 11 more >>> -------- >>> >>> Apparently, the source of problem is the third party library called Jep >>> which helps call Python scripts. >>> Do I have to make the third party library serializable? >>> Or there's a way to figure out this sort of thing in a totally different >>> way in Flink? >>> >>> Any help (even other frameworks than Flink) will be appreciated :-) >>> Thanks you. >>> >>> - Dongwon >> >