Hi Federico,

I can't reproduce the error in my local environment. Would you mind sharing
us your code and the full exception stack trace? This will help us diagnose
the problem. Thanks.

Federico D'Ambrosio <fedex...@gmail.com> 于2019年7月24日周三 下午5:45写道:

> Hi Caizhi,
>
> thank you for your response.
>
> 1) I see, I'll use a compatible string format
>
> 2) I'm defining the case class like this:
>
> case class cEvent(state: String, id: String, device: String,
>                   instance: String, subInstance: String, groupLabel: String, 
> time: Timestamp)
>
> object cEvent {
>   implicit val typeInformation: TypeInformation[cEvent] = 
> TypeInformation.of(classOf[cEvent])
> }
>
>
> I'm assuming I'm doing something wrong with the TypeInformation, since the
> table records are not being converted correctly. The precise error is the
> following:
>
> Arity [7] of result [ArrayBuffer(String, String, String, String, String,
> String, Timestamp)] does not match the number[1] of requested type
> [GenericType<cEvent>].
>
> I noticed there's a CaseClassTypeInfo which can be created from
> Types.CASECLASS[cEvent], but I'm not sure how to use it after defining
> the table.
>
> Thank you,
> Federico
>
> Il giorno mer 24 lug 2019 alle ore 10:42 Caizhi Weng <tsreape...@gmail.com>
> ha scritto:
>
>> Hi Federico,
>>
>> 1) As far as I know, you can't set a format for timestamp parsing
>> currently (see `SqlTimestampParser`, it just feeds your string to
>> `SqlTimestamp.valueOf`, so your timestamp format must be compatible with
>> SqlTimestamp).
>>
>> 2) How do you define your case class? You have to define its parameter
>> list and nothing in its body to make it work. For example: case class
>> Event(a: String, b: String, time: Timestamp)
>>
>> Federico D'Ambrosio <fedex...@gmail.com> 于2019年7月24日周三 下午4:10写道:
>>
>>> Hello everyone,
>>>
>>> I've always used the DataStream API and now I'm trying out the Table API
>>> to create a datastream from a CSV and I'm finding a couple of issues:
>>>
>>> 1) I'm reading a csv with 7 total fields, the 7th of which is a date
>>> serialized as a Spark TimestampType, written on the csv like this:
>>> 2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this:
>>>     val csvTableSource = CsvTableSource.builder()
>>>     .path("sourcefile.csv")
>>>     .fieldDelimiter(",")
>>> /* fields of Types.STRING */
>>>     .field("time", Types.SQL_TIMESTAMP)
>>>     .build()
>>> I'm transforming the Table to a DataStream of type Event:
>>>
>>> class Event {
>>>   // fields of type String
>>>   var time: Timestamp = _
>>> }
>>>
>>> val ds: DataStream[Event] = tEnv.toAppendStream[Event](table)
>>> But when I'm reading from the CSV the following parsing error occurs:
>>>
>>> Caused by: org.apache.flink.api.common.io.ParseException: Parsing error
>>> for column 7 of row '......,2019-07-20T09:52:07.000+01:00' originated by
>>> SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR.
>>>
>>> So, I'm wondering: is it possible to set a DateFormat or something to
>>> make sure the parsing succeeds? I've tried also Types.SQL_DATE and
>>> Types.SQL_TIME, but they fail with same exception.
>>>
>>> 2) My first option was to make Event as a case class, but with the same
>>> table definition, I was having trouble with the conversion, with an error
>>> telling that the "Arity of 7 fields was not compatible with the destination
>>> arity of 1, of type GenericType<Event>". What's the correct way to handle
>>> case classes? I changed to using a class (which I believe uses the POJO
>>> serializer) and it works ok, but I'm still wondering how to make it work
>>> with Case Classes, which come quite useful sometimes.
>>>
>>> Thank you very much,
>>> Federico
>>> --
>>> Federico D'Ambrosio
>>>
>>
>
> --
> Federico D'Ambrosio
>

Reply via email to