Hi, could you please share the full error message? I think it should list the supported metadata columns.
Do you see the same error with 'debezium-json' format instead of 'debezium-avro-confluent' ? Regards, Roman On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde <harshvardhan.shi...@oyorooms.com> wrote: > > Hi, > I'm trying to access the metadata columns from the debezium source connector > as documented here. > However I'm getting the following error when I try to select the rows from > the kafka table: > > flink.table.api.ValidationException: Invalid metadata key > 'value.ingestion-timestamp' in column 'origin_ts' > > Getting the same issue for all the virtual columns. Please let me know what > I'm doing wrong. > > Here's my table creation query: > > CREATE TABLE testFlink ( > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, > event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, > origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, > origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, > origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, > origin_properties MAP<STRING, STRING> METADATA FROM > 'value.source.properties' VIRTUAL, > id BIGINT, > number BIGINT, > created_at BIGINT, > updated_at BIGINT > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test', > 'properties.bootstrap.servers' = '<BROKER_URL>:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-avro-confluent', > 'value.debezium-avro-confluent.schema-registry.url' = > '<SCHEMA_REGISRTY>:8081' > ); > > Thanks.