Hi, You don’t need to call execute() method after calling print() method. print() method triggers the execution. The exception is raised because you call execute() after print() method.
Regards, Chiwan Park > On Apr 27, 2016, at 6:35 PM, nsengupta <sengupta.nirma...@gmail.com> wrote: > > Till, > > Thanks for looking into this. > > I have removed the toList() from the collect() function, to align the code > with what I generally do in a Flink application. It throws an Exception, and > I can't figure out why. > > *Here's my code (shortened for brevity):* > > case class BuildingInformation(buildingID: Int, buildingManager: Int, > buildingAge: Int, productID: String, country: String) > > object HVACReadingsAnalysis { > > def main(args: Array[String]): Unit = { > > val envDefault = ExecutionEnvironment.getExecutionEnvironment > > val buildings = > readBuildingInfo(envDefault,"./SensorFiles/building.csv") > > buildings.print > > envDefault.execute("HVAC Simulation") > } > > private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) > = { > > // [NS]: I can see the lines, read correctly from the CSV file here > println("As read from CSV file") > println(Source.fromFile(inputPath).getLines.toList.mkString("#\n")) > > // [NS]: Then, I read the same file using the library function > env.readCsvFile [BuildingInformation] ( > inputPath, > ignoreFirstLine = true, > pojoFields = > Array("buildingID","buildingManager","buildingAge","productID","country") > ) > } > > > *Relevant portion of the output: > * > As read from CSV file > BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country# > 1,M1,25,AC1000,USA# > 2,M2,27,FN39TG,France# > 3,M3,28,JDNS77,Brazil# > 4,M4,17,GG1919,Finland# > 5,M5,3,ACMAX22,Hong Kong# > 6,M6,9,AC1000,Singapore# > 7,M7,13,FN39TG,South Africa# > 8,M8,25,JDNS77,Australia# > 9,M9,11,GG1919,Mexico# > 10,M10,23,ACMAX22,China# > 11,M11,14,AC1000,Belgium# > 12,M12,26,FN39TG,Finland# > 13,M13,25,JDNS77,Saudi Arabia# > 14,M14,17,GG1919,Germany# > 15,M15,19,ACMAX22,Israel# > 16,M16,23,AC1000,Turkey# > 17,M17,11,FN39TG,Egypt# > 18,M18,25,JDNS77,Indonesia# > 19,M19,14,GG1919,Canada# > 20,M20,19,ACMAX22,Argentina > 15:34:18,914 INFO org.apache.flink.api.java.ExecutionEnvironment > > - The job has 0 registered types and 0 default Kryo serializers > 15:34:19,104 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster > > - Starting FlinkMiniCluster. > 15:34:19,912 INFO akka.event.slf4j.Slf4jLogger > > - Slf4jLogger started > > > // .. > // ... more log statements > // .. > > Exception in thread "main" java.lang.RuntimeException: No new data sinks > have been defined since the last execution. The last execution refers to the > latest call to 'execute()', 'count()', 'collect()', or 'print()'. > at > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979) > at > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) > at > main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60) > at > main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > Process finished with exit code 1 > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.