Hello Fabian (and others),

Sorry to bring up the same flogged topic of CountWindowAll() but I just
want to be sure that I understand it right.

For a dataset like the following (partial):

-----------------------------------------

probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,27.18
...........
-----------------------------------------

The following code :

-----------------------------------------
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)

    val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .countWindowAll(4,1)
      .maxBy(1)


    readings.print
-------------------------------------------

produces this (partial):

------------------------------------------
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-400c5cdf,27.18)
......
------------------------------------------

I am trying to justify the first three lines of the output. When I call
CountWindowAll(4,1), don't I instruct Flink that '*wait till you get at
least first 4 readings before you calculate the maximum*'? It appears that
Flink is calculating max() for every incoming tuple it is adding to the
window. What is the correct and complete interpretation of the computation
then?

-- N

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to