Hi Flink team, I am trying to use temporal join with two kafka-based streams ("tables"), where the key is formatted as JSON and value - confluent schema registry-based Avro. On joining I get the error: "*The Kafka table 'default_catalog.default_database.sensor_readings' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key*."
It's strange because the primary key is taken not from the value that is avro-confluent-based, but from the message key (JSON format). The DDLs are: ``` CREATE TABLE sensor_readings ( kafka_key_device_id VARCHAR primary key NOT ENFORCED, co DOUBLE, humidity DOUBLE, motion BOOLEAN, temp DOUBLE, ampere_hour DOUBLE, ts TIMESTAMP(3), proctime AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'sensor.readings.json_key3.sr', 'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}', 'properties.group.id' = 'device.tumbling.w.sr.sql', 'scan.startup.mode' = 'earliest-offset', 'properties.auto.offset.reset' = 'earliest', 'key.format' = 'json', 'key.fields' = 'kafka_key_device_id', 'key.fields-prefix' = 'kafka_key_', 'value.fields-include' = 'EXCEPT_KEY', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = '{sr_config["url"]}' ) CREATE TABLE device_account_stats ( kafka_key_device_id VARCHAR primary key NOT ENFORCED, metric_1 DOUBLE, metric_2 INTEGER, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'device_account.metrics.json_key3.sr', 'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}', 'properties.group.id' = 'device.tumbling.w.sr.sql', 'scan.startup.mode' = 'earliest-offset', 'properties.auto.offset.reset' = 'earliest', 'key.format' = 'json', 'key.fields' = 'kafka_key_device_id', 'key.fields-prefix' = 'kafka_key_', 'value.fields-include' = 'EXCEPT_KEY', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = '{sr_config["url"]}' ) ``` JOIN ``` tumbling_w_sql = """ SELECT sr.kafka_key_device_id as device_id, das.metric_1, das.metric_2, TUMBLE_START(sr.proctime, INTERVAL '30' SECONDS) AS window_start, TUMBLE_END(sr.proctime, INTERVAL '30' SECONDS) AS window_end, SUM(sr.ampere_hour) AS charge_consumed FROM sensor_readings sr JOIN device_account_stats FOR SYSTEM_TIME AS OF sr.proctime AS das ON sr.kafka_key_device_id = das.kafka_key_device_id GROUP BY TUMBLE(sr.proctime, INTERVAL '30' SECONDS), sr.kafka_key_device_id, das.metric_1, das.metric_2 """ ``` In case I remove primary keys, the error is "*Temporal Table Join requires primary key in versioned table, but no primary key can be found*". Is it possible to make temporal JOIN when a value had AVRO format? Regards, Olga