Hi, how can I get Flinks SQL client to nicely sink some data to either the regular kafka or the kafka-upsert connector?
I have a table/ topic with dummy data: CREATE TABLE metrics_brand_stream ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE, `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, brand string, duration int, rating int ) WITH ( 'connector' = 'kafka', 'topic' = 'commercials_avro', 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro-confluent', 'avro-confluent.schema-registry.url' = 'http://localhost:8081/', 'properties.group.id' = 'flink-test-001', 'properties.bootstrap.servers' = 'localhost:9092' ); And the following aggregation: SELECT brand, COUNT(*) AS cnt, AVG(duration) AS duration_mean, AVG(rating) AS rating_mean FROM metrics_brand_stream GROUP BY brand; When trying to define an output table: CREATE TABLE metrics_per_brand ( brand string, cnt BIGINT, duration_mean DOUBLE, rating_mean DOUBLE ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'metrics_per_brand', 'avro-confluent.schema-registry.url' = 'http://localhost:8081/', 'properties.group.id' = 'flink-test-001', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'avro-confluent', 'value.format' = 'avro-confluent' ); And trying to INSERT some result data: INSERT INTO metrics_per_brand SELECT brand, COUNT(*) AS cnt, AVG(duration) AS duration_mean, AVG(rating) AS rating_mean FROM metrics_brand_stream GROUP BY brand; The query fails with: org.apache.flink.table.api.ValidationException: One or more required options are missing. Missing required options are: url But neither: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ nor https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ nor https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ seems to list the right configuration (or I am misreading the documentation). How can I sink data to kafka after some arbitrary computation using the flink-sql client using either the kafka or upsert-kafka connector where the input is AVRO with a schema from the confluent schema registry and the output should store its schema there as well (and serialize using AVRO). Best, Georg