I have gone through this post <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html> , where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.
So, I think I haven't asked the question properly. Here is (hopefully) a better and easier version: 1. I begin with records of type RawMITSIMTuple. 2. When I group them using a Window, I get an AllWindowedStream[RawMITSIMTuple]. 3. I /fold/ the tuples obtained in the Window, which gives me a DataStream[Vector[RawMITSIMTuple]. 4. What I need is a DataStream[PositionReport]. So, I need to flatMap the output of previous step, where I first get hold of each of the RawMITSIMTuple and map that to PositionReport. val positionReportStream = this .readRawMITSIMTuplesInjected(envDefault,args(0)) .assignAscendingTimestamps(e => e.timeOfReport) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => { collectorBin :+ rawRecord) }) .flatMap(r => r.map(e => this.preparePositionReport(e))) This gives me what I want, but I feel this is verbose and inefficient. Am I thinking correctly? If so, what is a better idiom to use in such cases? -- Nirmalya -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.