Hi Federico, I can't reproduce the error in my local environment. Would you mind sharing us your code and the full exception stack trace? This will help us diagnose the problem. Thanks.
Federico D'Ambrosio <fedex...@gmail.com> 于2019年7月24日周三 下午5:45写道: > Hi Caizhi, > > thank you for your response. > > 1) I see, I'll use a compatible string format > > 2) I'm defining the case class like this: > > case class cEvent(state: String, id: String, device: String, > instance: String, subInstance: String, groupLabel: String, > time: Timestamp) > > object cEvent { > implicit val typeInformation: TypeInformation[cEvent] = > TypeInformation.of(classOf[cEvent]) > } > > > I'm assuming I'm doing something wrong with the TypeInformation, since the > table records are not being converted correctly. The precise error is the > following: > > Arity [7] of result [ArrayBuffer(String, String, String, String, String, > String, Timestamp)] does not match the number[1] of requested type > [GenericType<cEvent>]. > > I noticed there's a CaseClassTypeInfo which can be created from > Types.CASECLASS[cEvent], but I'm not sure how to use it after defining > the table. > > Thank you, > Federico > > Il giorno mer 24 lug 2019 alle ore 10:42 Caizhi Weng <tsreape...@gmail.com> > ha scritto: > >> Hi Federico, >> >> 1) As far as I know, you can't set a format for timestamp parsing >> currently (see `SqlTimestampParser`, it just feeds your string to >> `SqlTimestamp.valueOf`, so your timestamp format must be compatible with >> SqlTimestamp). >> >> 2) How do you define your case class? You have to define its parameter >> list and nothing in its body to make it work. For example: case class >> Event(a: String, b: String, time: Timestamp) >> >> Federico D'Ambrosio <fedex...@gmail.com> 于2019年7月24日周三 下午4:10写道: >> >>> Hello everyone, >>> >>> I've always used the DataStream API and now I'm trying out the Table API >>> to create a datastream from a CSV and I'm finding a couple of issues: >>> >>> 1) I'm reading a csv with 7 total fields, the 7th of which is a date >>> serialized as a Spark TimestampType, written on the csv like this: >>> 2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this: >>> val csvTableSource = CsvTableSource.builder() >>> .path("sourcefile.csv") >>> .fieldDelimiter(",") >>> /* fields of Types.STRING */ >>> .field("time", Types.SQL_TIMESTAMP) >>> .build() >>> I'm transforming the Table to a DataStream of type Event: >>> >>> class Event { >>> // fields of type String >>> var time: Timestamp = _ >>> } >>> >>> val ds: DataStream[Event] = tEnv.toAppendStream[Event](table) >>> But when I'm reading from the CSV the following parsing error occurs: >>> >>> Caused by: org.apache.flink.api.common.io.ParseException: Parsing error >>> for column 7 of row '......,2019-07-20T09:52:07.000+01:00' originated by >>> SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR. >>> >>> So, I'm wondering: is it possible to set a DateFormat or something to >>> make sure the parsing succeeds? I've tried also Types.SQL_DATE and >>> Types.SQL_TIME, but they fail with same exception. >>> >>> 2) My first option was to make Event as a case class, but with the same >>> table definition, I was having trouble with the conversion, with an error >>> telling that the "Arity of 7 fields was not compatible with the destination >>> arity of 1, of type GenericType<Event>". What's the correct way to handle >>> case classes? I changed to using a class (which I believe uses the POJO >>> serializer) and it works ok, but I'm still wondering how to make it work >>> with Case Classes, which come quite useful sometimes. >>> >>> Thank you very much, >>> Federico >>> -- >>> Federico D'Ambrosio >>> >> > > -- > Federico D'Ambrosio >