Hi,
Here's the complete error log:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts' of table
'flink_hive.harsh_test.testflink'. The DynamicTableSource class
'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource'
supports the following metadata keys for reading:
topic
partition
headers
leader-epoch
offset
timestamp
timestamp-type

I'll need some more time to test it with debezium-json format.

On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
>
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
>
> Regards,
> Roman
>
>
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <harshvardhan.shi...@oyorooms.com> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source
> connector as documented here.
> > However I'm getting the following error when I try to select the rows
> from the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key
> 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know
> what I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> >   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> VIRTUAL,
> >   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> >   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> >   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> >   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> >   origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
> >   id BIGINT,
> >   number BIGINT,
> >   created_at BIGINT,
> >   updated_at BIGINT
> > ) WITH (
> >   'connector' = 'kafka',
> >   'topic' =
> 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> >   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> >   'properties.group.id' = 'testGroup',
> >   'scan.startup.mode' = 'earliest-offset',
> >   'value.format' = 'debezium-avro-confluent',
> >   'value.debezium-avro-confluent.schema-registry.url' =
> '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
>


-- 
Thanks and Regards,
Harshvardhan
Data Platform

Reply via email to