I am reading a bunch of records from a CSV file. A record looks like this: "4/1/2014 0:11:00",40.769,-73.9549,"B02512"
I intend to treat these records as SQL Rows and then process. Here's the code: ---------------------------------------- package org.nirmalya.exercise import java.time.LocalDateTime import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.table.TableEnvironment import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.table._ import org.apache.flink.api.table.sources.CsvTableSource import org.apache.flink.api.scala.table.TableConversions import org.apache.flink.api.scala._ /** * Created by nirmalya on 4/2/17. */ object TrafficDataTrainer { def main(args: Array[String]): Unit = { case class Trip(timeOfPickUp: LocalDateTime, lat: Double, lon: Double, base: String) val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val myDataStorePath = "/home/nirmalya/Downloads/Traffic" val csvTableSource = new CsvTableSource( myDataStorePath + "/traffic-raw-data-apr14.csv", Array("timeOfPickUp", "lat", "lon", "base"), ( Array[org.apache.flink.api.common.typeinfo.TypeInformation[_]]( Types.TIMESTAMP, Types.DOUBLE, Types.DOUBLE, Types.STRING ) ) ) tableEnv.registerTableSource("TrafficData",csvTableSource) val trafficTable = tableEnv.scan("TrafficData") val result = trafficTable.select("timeOfPickUp,lat,lon,base") val trafficDataSet = new TableConversions(result).toDataSet[Trip] trafficDataSet.collect().take(10).foreach(println) } } ---------------------------------------- At run time, the exception that is thrown is: ------------------------------------------------------ Exception in thread "main" java.lang.IllegalArgumentException: The type 'java.sql.Date' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:306) at org.apache.flink.api.table.runtime.io.RowCsvInputFormat.<init>(RowCsvInputFormat.scala:52) at org.apache.flink.api.table.sources.CsvTableSource.createCsvInput(CsvTableSource.scala:99) at org.apache.flink.api.table.sources.CsvTableSource.getDataSet(CsvTableSource.scala:78) at org.apache.flink.api.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:55) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at org.nirmalya.exercise.UberDataTrainer$.main(UberDataTrainer.scala:45) at org.nirmalya.exercise.UberDataTrainer.main(UberDataTrainer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) ------------------------------------------------------ I see that in org.apache.flink.api.common.io.GenericCsvInputFormat:303, the check fails because the stated type isn't a part of known types. However, the constructor of *CsvTableSource* accepts a /Types.DATE/ as well /Types.TIMESTAMP/ (I tried with both of them, and the exception is the same). Could someone please point out where I am going wrong? -- Nirmalya -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-java-sql-DateTime-is-not-supported-tp11439.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.