Hi, Here's the complete error log: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Invalid metadata key 'value.ingestion-timestamp' in column 'origin_ts' of table 'flink_hive.harsh_test.testflink'. The DynamicTableSource class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' supports the following metadata keys for reading: topic partition headers leader-epoch offset timestamp timestamp-type
I'll need some more time to test it with debezium-json format. On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi, > could you please share the full error message? > I think it should list the supported metadata columns. > > Do you see the same error with 'debezium-json' format instead of > 'debezium-avro-confluent' ? > > Regards, > Roman > > > On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde > <harshvardhan.shi...@oyorooms.com> wrote: > > > > Hi, > > I'm trying to access the metadata columns from the debezium source > connector as documented here. > > However I'm getting the following error when I try to select the rows > from the kafka table: > > > > flink.table.api.ValidationException: Invalid metadata key > 'value.ingestion-timestamp' in column 'origin_ts' > > > > Getting the same issue for all the virtual columns. Please let me know > what I'm doing wrong. > > > > Here's my table creation query: > > > > CREATE TABLE testFlink ( > > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > VIRTUAL, > > event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, > > origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, > > origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, > > origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, > > origin_properties MAP<STRING, STRING> METADATA FROM > 'value.source.properties' VIRTUAL, > > id BIGINT, > > number BIGINT, > > created_at BIGINT, > > updated_at BIGINT > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = > 'source-staging-postgres-flink_test-82-2021-09-20.public.test', > > 'properties.bootstrap.servers' = '<BROKER_URL>:9092', > > 'properties.group.id' = 'testGroup', > > 'scan.startup.mode' = 'earliest-offset', > > 'value.format' = 'debezium-avro-confluent', > > 'value.debezium-avro-confluent.schema-registry.url' = > '<SCHEMA_REGISRTY>:8081' > > ); > > > > Thanks. > -- Thanks and Regards, Harshvardhan Data Platform