Thanks for the quick replies Dawid and Leonard... I had both flink-json JARs for 1.10 and 1.11. I deleted 1.10 and now it works!
On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu <xbjt...@gmail.com> wrote: > Hi,Kale > > I think you’re using correct TIMESTAMP Data type in JSON format, and this > should work properly. > But looks like you used an old version `flink-json` dependency from the > log. Could you check the version of `flink-json` is 1.11.0 ? > > Best, > Leonard Xu > > > 在 2020年7月14日,18:07,Manas Kale <manaskal...@gmail.com> 写道: > > Hi, > I am trying to parse this JSON message: > {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, > "time_st": "2020-07-14 15:15:19.600000"} > using pyFlink 1.11 DDL with this code: > > ddl_source = f""" > CREATE TABLE {INPUT_TABLE} ( > `monitorId` STRING, > `deviceId` STRING, > `state` INT, > `time_st` TIMESTAMP(3), > WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, > `data` DOUBLE > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > > I used *[1]* for the DDL format and *[2]* for the timestamp string > format. However, when I run this I get the following error : > *Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": > 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 > 15:15:19.600000"}'.* > at > org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126) > at > org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > *Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT* > > I believe I am using the correct TIMESTAMP format in the JSON message > according to the documentation so can't figure out what could be the error. > > Any help would be appreciated! > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html#defining-in-create-table-ddl-1 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#timestamp > > Thanks, > Manas > > >