Hi Hang, My question is can we use specific ‘value.deserializer’ in table option via kafka connector is there any way or not ? I have already kept 'value.format' in below code snippet so is that enough and handle deserializer by itself internally?
How to create custom format can you please share any link for sample example for the same ? Thanks Ronak Beejawat From: Hang Ruan <ruanhang1...@gmail.com> Sent: Monday, January 10, 2022 3:06 PM To: dev@flink.apache.org; Ronak Beejawat (rbeejawa) <rbeej...@cisco.com> Cc: commun...@flink.apache.org; u...@flink.apache.org Subject: Re: Regarding Connector Options - value.deserializer Hi, Ronak, I think you should implement a custom format by yourself instead of overriding. The 'value.format' is a required table option. Best, Hang Ronak Beejawat (rbeejawa) <rbeej...@cisco.com.invalid<mailto:rbeej...@cisco.com.invalid>> 于2022年1月10日周一 17:09写道: Hi Team, Is there any way we use value.deserializer in Connector Options from kafka via sql api? PFB below code snippt : tableEnv.executeSql("CREATE TABLE cmrTable (\r\n" + " org_id STRING\r\n" + " ,cluster_id STRING\r\n" + " ,globalcallid_callmanagerid STRING\r\n" + " ,globalcallid_callid INT\r\n" + " ,callidentifier INT\r\n" + " ,varvqmetrics STRING\r\n" + " ,duration INT\r\n" + " )\r\n" + " WITH (\r\n" + " 'connector' = 'kafka'\r\n" + " ,'topic' = 'cmr'\r\n" + " ,'properties.bootstrap.servers' = 'b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092<http://b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092>'\r\n" + " ,'scan.startup.mode' = 'earliest-offset'\r\n" + " ,'properties.value.deserializer' = 'json'\r\n" + " ,'value.format' = 'json'\r\n" + " )"); Thanks Ronak Beejawat