Hi, I am trying to parse this JSON message: {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"} using pyFlink 1.11 DDL with this code:
ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` STRING, `deviceId` STRING, `state` INT, `time_st` TIMESTAMP(3), WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, `data` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """ I used *[1]* for the DDL format and *[2]* for the timestamp string format. However, when I run this I get the following error : *Caused by: java.io.IOException: Failed to deserialize JSON '{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.600000"}'.* at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126) at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) *Caused by: java.lang.NoSuchFieldError: SQL_TIMESTAMP_FORMAT* I believe I am using the correct TIMESTAMP format in the JSON message according to the documentation so can't figure out what could be the error. Any help would be appreciated! [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html#defining-in-create-table-ddl-1 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#timestamp Thanks, Manas