Till, Thanks for looking into this.
I have removed the toList() from the collect() function, to align the code with what I generally do in a Flink application. It throws an Exception, and I can't figure out why. *Here's my code (shortened for brevity):* case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String) object HVACReadingsAnalysis { def main(args: Array[String]): Unit = { val envDefault = ExecutionEnvironment.getExecutionEnvironment val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv") buildings.print envDefault.execute("HVAC Simulation") } private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = { // [NS]: I can see the lines, read correctly from the CSV file here println("As read from CSV file") println(Source.fromFile(inputPath).getLines.toList.mkString("#\n")) // [NS]: Then, I read the same file using the library function env.readCsvFile [BuildingInformation] ( inputPath, ignoreFirstLine = true, pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country") ) } *Relevant portion of the output: * As read from CSV file BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country# 1,M1,25,AC1000,USA# 2,M2,27,FN39TG,France# 3,M3,28,JDNS77,Brazil# 4,M4,17,GG1919,Finland# 5,M5,3,ACMAX22,Hong Kong# 6,M6,9,AC1000,Singapore# 7,M7,13,FN39TG,South Africa# 8,M8,25,JDNS77,Australia# 9,M9,11,GG1919,Mexico# 10,M10,23,ACMAX22,China# 11,M11,14,AC1000,Belgium# 12,M12,26,FN39TG,Finland# 13,M13,25,JDNS77,Saudi Arabia# 14,M14,17,GG1919,Germany# 15,M15,19,ACMAX22,Israel# 16,M16,23,AC1000,Turkey# 17,M17,11,FN39TG,Egypt# 18,M18,25,JDNS77,Indonesia# 19,M19,14,GG1919,Canada# 20,M20,19,ACMAX22,Argentina 15:34:18,914 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 15:34:19,104 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster. 15:34:19,912 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started // .. // ... more log statements // .. Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) at main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60) at main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Process finished with exit code 1 -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.