Hi, Ronak, I think you should implement a custom format by yourself instead of overriding. The 'value.format' is a required table option.
Best, Hang Ronak Beejawat (rbeejawa) <rbeej...@cisco.com.invalid> 于2022年1月10日周一 17:09写道: > Hi Team, > > Is there any way we use value.deserializer in Connector Options from kafka > via sql api? > > PFB below code snippt : > > tableEnv.executeSql("CREATE TABLE cmrTable (\r\n" > + " org_id STRING\r\n" > + " ,cluster_id STRING\r\n" > + " ,globalcallid_callmanagerid STRING\r\n" > + " ,globalcallid_callid INT\r\n" > + " ,callidentifier INT\r\n" > + " ,varvqmetrics STRING\r\n" > + " ,duration INT\r\n" > + " )\r\n" > + " WITH (\r\n" > + " 'connector' = 'kafka'\r\n" > + " ,'topic' = 'cmr'\r\n" > + " ,'properties.bootstrap.servers' = ' > b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092 > '\r\n" > + " ,'scan.startup.mode' = 'earliest-offset'\r\n" > + " ,'properties.value.deserializer' = 'json'\r\n" > + " ,'value.format' = 'json'\r\n" > + " )"); > > > Thanks > Ronak Beejawat >