Hi,

how can I get Flinks SQL client to nicely sink some data to either the
regular kafka or the kafka-upsert connector?

I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
    brand string,
    duration int,
    rating int

) WITH (
    'connector' = 'kafka',
    'topic' = 'commercials_avro',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
    'properties.group.id' = 'flink-test-001',
    'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
         COUNT(*) AS cnt,
         AVG(duration) AS  duration_mean,
         AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
    brand string,
    cnt BIGINT,
    duration_mean DOUBLE,
    rating_mean DOUBLE

) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'metrics_per_brand',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
    'properties.group.id' = 'flink-test-001',
    'properties.bootstrap.servers' = 'localhost:9092',
    'key.format' = 'avro-confluent',
    'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
  SELECT brand,
         COUNT(*) AS cnt,
         AVG(duration) AS  duration_mean,
         AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required
options are missing.

Missing required options are:

url

But neither:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
seems to list the right configuration (or I am misreading the
documentation).


How can I sink data to kafka after some arbitrary computation using the
flink-sql client using either the kafka or upsert-kafka connector where the
input is AVRO with a schema from the confluent schema registry and the
output should store its schema there as well (and serialize using AVRO).


Best,
Georg

Reply via email to