Hello Fabian <fhue...@gmail.com> Merry Christmas to you and everyone else in this forum.
Another neophyte's question, patience please. I have following code: val env = StreamExecutionEnvironment.createLocalEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val readings = readIncomingReadings(env,"./sampleIOT.csv") .map(e => (e.timeStamp,e.ambientTemperature)) .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1) .timeWindowAll(Time.milliseconds(3)) .maxBy(1) In the datafile, timestamps are 2nd from the right field (first few records only): probe-42a9ddca,193,819.12,74.3712,1448028161,22.07 probe-252a5bbd,197,710.32,80.6072,1448028161,14.64 probe-987f2cb6,200,750.4,76.0533,1448028161,14.72 probe-24444323,197,816.06,84.0816,1448028161,4.405 probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43 probe-20c609fb,204,804.37,84.5243,1448028161,22.87 probe-c027fdc9,195,858.61,81.7682,1448028161,24.47 probe-2c6cd3de,198,826.96,85.26,1448028162,18.99 probe-960906ca,197,797.63,77.4359,1448028162,27.62 probe-16226f9e,199,835.5,81.2027,1448028162,18.82 probe-4de4e64b,200,851.04,80.5296,1448028162,27.43 ....... The output is: (1448028163,27.83) (1448028166,32.06) (1448028160,30.02) The contents are correct, but I am not sure about the *order in which they appear*. Because I am using val env = StreamExecutionEnvironment.createLocalEnvironment(1) // only one thread anyway and the timestamps are guaranteed to be in the *ascending order* (I have sorted the CSV before using it), my expectation is that the Flink should print the output as: (1448028160,30.02) (1448028163,27.83) (1448028166,32.06) How do I explain the randomness? -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."