Hey Siva, Can you try adding the following to your table's configuration:
'value.fields-include' = 'EXCEPT_KEY' Best, --Gunnar On Wed, 28 May 2025 at 20:52, Siva Ram Gujju <sivaram2...@gmail.com> wrote: > Hello, > > I am new to Flink. Running into an issue to read Key from Kafka messages > in Flink SQL Client. > > I have an Order Kafka topic with below messages: > > Key: {"orderNumber":"1234"} > Value: {"orderDate":"20250528","productId":"Product123"} > > I am able to get the key and value by using > > kafka-console-consumer.bat --property "print.key=true" --topic Orders > --from-beginning --bootstrap-server xxxxxx > > > I tried to create a table in Flink SQL: > > CREATE TABLE Orders ( > orderNumber STRING, > orderDate STRING, > productId STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'Orders', > 'properties.bootstrap.servers' = 'xxxxxx', > 'properties.group.id' = 'FlinkGroupId', > 'format' = 'json', > 'scan.startup.mode' = 'earliest-offset', > 'key.format' = 'json', > 'key.fields' = 'orderNumber;' > );When I query the table select * from Orders; orderNumber is printing as > "NULL" instead of the actual orderNumber. > > > - I tried changing the type to string. Still it is printing as "NULL" > - If I send a malformed JSON in the key, Flink is throwing an error. > So, it is clear that Flink is trying to parse the key field as JSON. But > why is it printing as "NULL" in the select query output? > > > Can anyone please advise what I am doing wrong? > > Thanks, > Siva >