Hi Flink team,

I am trying to use temporal join with two kafka-based streams ("tables"),
where the key is formatted as JSON and value - confluent schema
registry-based Avro.
On joining I get the error: "*The Kafka table
'default_catalog.default_database.sensor_readings' with 'avro-confluent'
format doesn't support defining PRIMARY KEY constraint on the table,
because it can't guarantee the semantic of primary key*."

It's strange because the primary key is taken not from the value that is
avro-confluent-based, but from the message key (JSON format).

The DDLs are:


```

CREATE TABLE sensor_readings (
    kafka_key_device_id VARCHAR primary key NOT ENFORCED,

    co DOUBLE,
    humidity DOUBLE,
    motion BOOLEAN,
    temp DOUBLE,
    ampere_hour DOUBLE,
    ts TIMESTAMP(3),
    proctime AS PROCTIME(),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'sensor.readings.json_key3.sr',
    'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}',
    'properties.group.id' = 'device.tumbling.w.sr.sql',
    'scan.startup.mode' = 'earliest-offset',
    'properties.auto.offset.reset' = 'earliest',
    'key.format' = 'json',
    'key.fields' = 'kafka_key_device_id',
    'key.fields-prefix' = 'kafka_key_',
    'value.fields-include' = 'EXCEPT_KEY',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '{sr_config["url"]}'
)


CREATE TABLE device_account_stats (
    kafka_key_device_id VARCHAR  primary key NOT ENFORCED,
    metric_1 DOUBLE,
    metric_2 INTEGER,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'device_account.metrics.json_key3.sr',
    'properties.bootstrap.servers' = '{cc_config["bootstrap.servers"]}',
    'properties.group.id' = 'device.tumbling.w.sr.sql',
    'scan.startup.mode' = 'earliest-offset',
    'properties.auto.offset.reset' = 'earliest',
    'key.format' = 'json',
    'key.fields' = 'kafka_key_device_id',
    'key.fields-prefix' = 'kafka_key_',
    'value.fields-include' = 'EXCEPT_KEY',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '{sr_config["url"]}'
)

```
JOIN
```

tumbling_w_sql = """
        SELECT
            sr.kafka_key_device_id as device_id,
            das.metric_1,
            das.metric_2,
            TUMBLE_START(sr.proctime, INTERVAL '30' SECONDS) AS window_start,
            TUMBLE_END(sr.proctime, INTERVAL '30' SECONDS) AS window_end,
            SUM(sr.ampere_hour) AS charge_consumed
        FROM sensor_readings sr
        JOIN device_account_stats FOR SYSTEM_TIME AS OF sr.proctime AS
das ON sr.kafka_key_device_id = das.kafka_key_device_id
        GROUP BY
            TUMBLE(sr.proctime, INTERVAL '30' SECONDS),
            sr.kafka_key_device_id,
            das.metric_1,
            das.metric_2
    """

```
In case I remove primary keys, the error is "*Temporal Table Join requires
primary key in versioned table, but no primary key can be found*".

Is it possible to make temporal JOIN when a value had AVRO format?

Regards,
Olga

Reply via email to