Hi Nirmalya, what does the compiler say if you use the variant without explicit TypeInfo? Like this:
.fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower, ) Best, Aljoscha On Thu, 23 Feb 2017 at 14:41 nsengupta <sengupta.nirma...@gmail.com> wrote: > For reasons I cannot grasp, I am unable to move ahead. > > Here's the code: > > --------------------------------------------------------------------------------------------------------------------------------------------- > > > import org.apache.flink.api.common.functions.FoldFunction > import org.apache.flink.api.java.typeutils.TupleTypeInfo > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction > import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > import org.apache.flink.streaming.api.scala._ > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, > Window} > import org.apache.flink.util.Collector > import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport, > RawMITSIMTuple, VehicleID} > > case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, > eWaySegment: Int) > > case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: > Int) > > case class PositionReport( > // tupletype: Int, > timeOfReport: Int, > eWayCoordinates: EWayCoordinates, > vehicleDetails: VehicleDetails > ) > > > // .... > > > val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4) > envDefault > .setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val readings = IndexedSeq [RawMITSIMTuple] ( > RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1) > ) > > val folder = new FoldFunction[PositionReport, > Map[EWayCoordinates,Set[Int]]] > { > override > def fold( > t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport > ): Map[EWayCoordinates, Set[VehicleID]] = { > t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty) > + (o.vehicleDetails.vehicleID))) > } > } > > val windower = new AllWindowFunction[Map[EWayCoordinates, > Set[VehicleID]],(EWayCoordinates,Int),Window] { > override > def apply( > w: Window, > bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]], > collector: Collector[(EWayCoordinates, VehicleID)]): Unit = { > > val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e => > e.size) > > allVehiclesInLast30Mins.foreach(e => println(e)) > > collector.collect((EWayCoordinates(-1,-1,-1,-1),0)) > > } > } > > val uniqueVehicles = envDefault > .fromCollection(readings) > .map(e => MITSIMUtils.preparePositionReport(e)) > .assignAscendingTimestamps(e => e.timeOfReport) > .keyBy(e => ( > e.eWayCoordinates.eWayID, > e.eWayCoordinates.eWayDir, > e.eWayCoordinates.eWaySegment, > e.vehicleDetails.vehicleID)) > .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) > .fold( > // Seed > Map[EWayCoordinates,Set[VehicleID]](), > > // FoldFunction > folder, > > // WindowFunction > windower, > > // Satisfying the compiler > new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), > new TupleTypeInfo[(EWayCoordinates,Int)] > ) > > > ----------------------------------------------------------------------------------------- > > The compiler is unhappy: > > [ERROR] > > /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136: > error: missing argument list for method fold in class AllWindowedStream > [ERROR] Unapplied methods are only converted to functions when a function > type is expected. > [ERROR] You can make this conversion explicit by writing `fold _` or > `fold(_)(_)(_)` instead of `fold`. > [ERROR] .fold( > [ERROR] ^ > [ERROR] one error found > > > ---------------------------------------------------------------------------------------- > > I understand why is the compiler unhappy, but I am unsure if I have to go > through all the *devilry*. In no Flink example, I see some such thing being > prescribed. But, then, perhaps I am missing an important point. > > I have been through this comment > < > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Incremental-aggregations-Example-not-working-td10581.html#a10585 > > > by *Yassine Marzougui*, before I added those type hints. But, I am using > *Flink 1.2.0*. > > I know this sounds silly, but I am simply failing to get out of this. > > All help appreciated. > > -- Nirmalya > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >