Hi,
could you please share the full error message?
I think it should list the supported metadata columns.

Do you see the same error with 'debezium-json' format instead of
'debezium-avro-confluent' ?

Regards,
Roman


On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
<harshvardhan.shi...@oyorooms.com> wrote:
>
> Hi,
> I'm trying to access the metadata columns from the debezium source connector 
> as documented here.
> However I'm getting the following error when I try to select the rows from 
> the kafka table:
>
> flink.table.api.ValidationException: Invalid metadata key 
> 'value.ingestion-timestamp' in column 'origin_ts'
>
> Getting the same issue for all the virtual columns. Please let me know what 
> I'm doing wrong.
>
> Here's my table creation query:
>
> CREATE TABLE testFlink (
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
>   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
>   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
>   origin_properties MAP<STRING, STRING> METADATA FROM 
> 'value.source.properties' VIRTUAL,
>   id BIGINT,
>   number BIGINT,
>   created_at BIGINT,
>   updated_at BIGINT
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
>   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'value.format' = 'debezium-avro-confluent',
>   'value.debezium-avro-confluent.schema-registry.url' = 
> '<SCHEMA_REGISRTY>:8081'
> );
>
> Thanks.

Reply via email to