Hello everyone, I've always used the DataStream API and now I'm trying out the Table API to create a datastream from a CSV and I'm finding a couple of issues:
1) I'm reading a csv with 7 total fields, the 7th of which is a date serialized as a Spark TimestampType, written on the csv like this: 2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this: val csvTableSource = CsvTableSource.builder() .path("sourcefile.csv") .fieldDelimiter(",") /* fields of Types.STRING */ .field("time", Types.SQL_TIMESTAMP) .build() I'm transforming the Table to a DataStream of type Event: class Event { // fields of type String var time: Timestamp = _ } val ds: DataStream[Event] = tEnv.toAppendStream[Event](table) But when I'm reading from the CSV the following parsing error occurs: Caused by: org.apache.flink.api.common.io.ParseException: Parsing error for column 7 of row '......,2019-07-20T09:52:07.000+01:00' originated by SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR. So, I'm wondering: is it possible to set a DateFormat or something to make sure the parsing succeeds? I've tried also Types.SQL_DATE and Types.SQL_TIME, but they fail with same exception. 2) My first option was to make Event as a case class, but with the same table definition, I was having trouble with the conversion, with an error telling that the "Arity of 7 fields was not compatible with the destination arity of 1, of type GenericType<Event>". What's the correct way to handle case classes? I changed to using a class (which I believe uses the POJO serializer) and it works ok, but I'm still wondering how to make it work with Case Classes, which come quite useful sometimes. Thank you very much, Federico -- Federico D'Ambrosio