Hello Fabian (and others), Sorry to bring up the same flogged topic of CountWindowAll() but I just want to be sure that I understand it right.
For a dataset like the following (partial): ----------------------------------------- probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 probe-dccefede,199,749.25,78.6057,1448028160,27.46 probe-f29f9662,199,821.81,81.7831,1448028160,22.35 probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 probe-4d78b545,204,778.42,78.412,1448028160,25.92 probe-400c5cdf,204,711.65,73.585,1448028160,27.18 ........... ----------------------------------------- The following code : ----------------------------------------- val env = StreamExecutionEnvironment.createLocalEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setParallelism(1) val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature)) .countWindowAll(4,1) .maxBy(1) readings.print ------------------------------------------- produces this (partial): ------------------------------------------ (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-400c5cdf,27.18) ...... ------------------------------------------ I am trying to justify the first three lines of the output. When I call CountWindowAll(4,1), don't I instruct Flink that '*wait till you get at least first 4 readings before you calculate the maximum*'? It appears that Flink is calculating max() for every incoming tuple it is adding to the window. What is the correct and complete interpretation of the computation then? -- N -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."