Hello Fabian <fhue...@gmail.com>

Merry Christmas to you and everyone else in this forum.

Another neophyte's question, patience please.

I have following code:

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val readings =
      readIncomingReadings(env,"./sampleIOT.csv")
      .map(e => (e.timeStamp,e.ambientTemperature))
      .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
      .timeWindowAll(Time.milliseconds(3))
      .maxBy(1)


In the datafile, timestamps are 2nd from the right field (first few records
only):

probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
probe-16226f9e,199,835.5,81.2027,1448028162,18.82
probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
.......


The output is:

(1448028163,27.83)
(1448028166,32.06)
(1448028160,30.02)

The contents are correct, but I am not sure about the *order in which they
appear*. Because I am using

val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only one
thread anyway


and the timestamps are guaranteed to be in the *ascending order* (I have
sorted the CSV before using it), my expectation is that the Flink should
print the output as:

(1448028160,30.02)

(1448028163,27.83)

(1448028166,32.06)

How do I explain the randomness?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Reply via email to