Hi all, I'm using flink 1.13.5 (as I was originally using the ververica Flink CDC connector) and am trying to understand something. I'm just using the Flink SQL CLI at this stage to verify that I can stream a PostgreSQL table into Flink SQL to compute a continuous materialised view. I was inspecting the kafka messages that were being published by debezium originally and noticed that they were incredibly verbose including all new/old values, this was because I was using REPLICA IDENTITY FULL on the source tables.
I've now created unique indexes using the primary keys and REPLICA IDENTITY USING INDEX [INDEX]. I understand that the changed rows can now be matched using their index row, meaning we don't need to send the before contents of the row to identify it. When running my simple select * query on the table I get the following error: Flink SQL> select * from devices_device; *[ERROR] Could not execute SQL statement. Reason:java.lang.IllegalStateException: The "before" field of UPDATE message is null, if you are using Debezium Postgres Connector, please check the Postgres table has been set REPLICA IDENTITY to FULL level.* My table definition: CREATE TABLE devices_device ( id INT NOT NULL, legacy_device_type_id INT, endpoint_id INT NOT NULL, logical_device_id INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'django-db.public.devices_device', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json' ); The message makes perfect sense, but I can't quite understand why I can't use REPLICA IDENTITY USING INDEX? Does anyone know if this was a decision that was made at some point or it's not technically possible for some reason? Note: I will change to using REPLICA IDENTITY FULL so I can continue working for now but It's not something I want to put into production. Thanks for your consideration! -- This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia