Hi, Ronak, We can not set specific 'value.deserializer' in table option. 'key.deserializer' and 'value.deserializer' is always set to 'org.apache.kafka.common.serialization.ByteArrayDeserializer'.
If you want to implement a format, you could take a look at the code JsonFormatFactory.java in flink-formats/flink-json. And the format will be loaded via SPI. Best, Hang Ronak Beejawat (rbeejawa) <rbeej...@cisco.com> 于2022年1月10日周一 17:51写道: > Hi Hang, > > > > My question is can we use specific ‘value.deserializer’ in table option > via kafka connector is there any way or not ? I have already kept > 'value.format' in below code snippet so is that enough and handle > deserializer by itself internally? > > How to create custom format can you please share any link for sample > example for the same ? > > > > Thanks > > Ronak Beejawat > > > > > > > > *From:* Hang Ruan <ruanhang1...@gmail.com> > *Sent:* Monday, January 10, 2022 3:06 PM > *To:* d...@flink.apache.org; Ronak Beejawat (rbeejawa) <rbeej...@cisco.com> > *Cc:* commun...@flink.apache.org; user@flink.apache.org > *Subject:* Re: Regarding Connector Options - value.deserializer > > > > Hi, Ronak, > > > > I think you should implement a custom format by yourself instead of > overriding. The 'value.format' is a required table option. > > > > Best, > > Hang > > > > Ronak Beejawat (rbeejawa) <rbeej...@cisco.com.invalid> 于2022年1月10日周一 17:09 > 写道: > > Hi Team, > > Is there any way we use value.deserializer in Connector Options from kafka > via sql api? > > PFB below code snippt : > > tableEnv.executeSql("CREATE TABLE cmrTable (\r\n" > + " org_id STRING\r\n" > + " ,cluster_id STRING\r\n" > + " ,globalcallid_callmanagerid STRING\r\n" > + " ,globalcallid_callid INT\r\n" > + " ,callidentifier INT\r\n" > + " ,varvqmetrics STRING\r\n" > + " ,duration INT\r\n" > + " )\r\n" > + " WITH (\r\n" > + " 'connector' = 'kafka'\r\n" > + " ,'topic' = 'cmr'\r\n" > + " ,'properties.bootstrap.servers' = ' > b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092 > '\r\n" > + " ,'scan.startup.mode' = 'earliest-offset'\r\n" > + " ,'properties.value.deserializer' = 'json'\r\n" > + " ,'value.format' = 'json'\r\n" > + " )"); > > > Thanks > Ronak Beejawat > >