Hi Polarisary. Checked the flink codebase and your stacktraces, seems you need to format the timestamp as : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
The code is here: https://github.com/apache/flink/blob/38e4e2b8f9bc63a793a2bddef5a578e3f80b7376/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L340 On Fri, Nov 1, 2019 at 3:50 PM Polarisary <polaris...@gmail.com> wrote: > Hi All: > I have define kafka connector Descriptor, and registe Table > > tEnv.connect(new Kafka() > .version("universal") > .topic(tableName) > .startFromEarliest() > .property("zookeeper.connect", *“*xxx") > .property("bootstrap.servers", *“*xxx") > .property("group.id", *“*xxx")) > .withFormat(new Json().deriveSchema()) > .withSchema(new Schema() > > .field("rowtime", Types.SQL_TIMESTAMP) > .rowtime(new Rowtime() > .timestampsFromField("createTime") > .watermarksPeriodicBounded(300_000)) > .field("data", Types.ROW(dataFieldTypes))) > > .inAppendMode().registerTableSource(tableName); > > > > kafka input is: > { > > "data": [ > 18140781, > ], > "createTime": 1572577137596 > } > > Exception as follows: > Caused by: java.io.IOException: Failed to deserialize JSON object. > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203) > Caused by: java.time.format.DateTimeParseException: Text '1553080631582' > could not be parsed at index 0 > at > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127) > ... 7 more > > polaris...@gmail.com > > > > >