Hi, I'm trying to access the metadata columns from the debezium source connector as documented here <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#available-metadata> . However I'm getting the following error when I try to select the rows from the kafka table:
flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts' Getting the same issue for all the *virtual* columns. Please let me know what I'm doing wrong. Here's my table creation query: CREATE TABLE testFlink ( origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL, id BIGINT, number BIGINT, created_at BIGINT, updated_at BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test', 'properties.bootstrap.servers' = '<BROKER_URL>:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-avro-confluent', 'value.debezium-avro-confluent.schema-registry.url' = '<SCHEMA_REGISRTY>:8081' ); Thanks.