Hello Stefano <[email protected]>
I have tried to implement what I understood from your mail earlier in the
day, but it doesn't seem to produce the result I expect. Here's the code
snippet:
-------------------------------------------------------------------------
val env = StreamExecutionEnvironment.createLocalEnvironment(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.keyBy(_.readingTimeStamp)
.countWindow(4,2)
val avgReadings =
readings
.apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out:
Collector[Float]) => {
val readings : Iterable[Float] = v.map(_.ambientTemperature)
val avg = readings.sum / readings.size
out.collect(avg)
}).setParallelism(1)
avgReadings.print()
-------------------------------------------------------------------------
And, here's the output:
-------------------------------------------------------------------------
1> 23.67
1> 21.0025
1> 23.79
2> 25.02
2> 23.3425
2> 25.02
3> 26.55
4> 19.970001
3> 18.93375
4> 25.727499
3> 18.93375
4> 25.7075
--------------------------------------------------------------------------
My understanding is that because I have associated a parallelism(1) to the
avgReadings transformation, it should aggregate the streams from all the 4
earlier windows, and then compute the single average value. It is quite
apparent that there is a gap in my understanding. Could you please point
out the mistake that I am making?
Many thanks in advance.
-- Nirmalya
--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."