I have gone through this  post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
 
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed. 

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.    I begin with records of type RawMITSIMTuple. 
2.    When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.    I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to