Hi,

from which Flink version are you upgrading? There were some changes in 1.9 for how to parse timestamps in JSON format.

Your error might be related to those changes:

https://issues.apache.org/jira/browse/FLINK-11727

I hope this helps.

Timo


On 07.02.20 07:57, sunfulin wrote:
Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema 
defination.
I am reading and consuming records from kafka with json schema like   
{"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :



.withSchema(
     new Schema()
             // eventTime
             .field("rowtime", 
DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
                 new Rowtime()
                     .timestampsFromField("recv_time")
                     .watermarksPeriodicBounded(1000)
             )
             .field("user_id", DataTypes.STRING())






But, I am running an issue and got exception like the following:


Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could 
not be parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)


回复