I have gone through this post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.
So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:
1. I begin with records of type RawMITSIMTuple.
2. When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3. I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4. What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.
val positionReportStream = this
.readRawMITSIMTuplesInjected(envDefault,args(0))
.assignAscendingTimestamps(e => e.timeOfReport)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
.fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
collectorBin :+ rawRecord)
})
.flatMap(r => r.map(e => this.preparePositionReport(e)))
This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?
-- Nirmalya
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.