[ https://issues.apache.org/jira/browse/FLINK-20999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272149#comment-17272149 ]
Svend Vanderveken commented on FLINK-20999: ------------------------------------------- (not sure why the link to the PR doesn't seem to be attaching automatically, adding manually) > Confluent Avro Format should document how to serialize kafka keys > ----------------------------------------------------------------- > > Key: FLINK-20999 > URL: https://issues.apache.org/jira/browse/FLINK-20999 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / Ecosystem > Affects Versions: 1.12.0 > Reporter: Svend Vanderveken > Assignee: Svend Vanderveken > Priority: Minor > Fix For: 1.13.0 > > > The [Confluent Avro > Format|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html] > only shows example of how to serialize/deserialize Kafka values. Also, > parameter description is not always clear what is influencing the source and > the sink behaviour, IMHO. > This seems surprising especially in the context of a sink kafka connector > since keys are such an important concept in that case. > Adding examples of how to serialize/deserialize Kafka keys would add clarity. > While it can be argued that a connector format is independent from the > underlying storage, probably showing kafka-oriented examples in this case > (i.e, with a concept of "key" and "value") makes senses here since this > connector is very much thought with Kafka in mind. > > I'm happy to submit a PR with all if this suggested change is approved? > > I suggest to add this: > h3. writing to Kafka while keeping the keys in "raw" big endian format: > {code:java} > CREATE TABLE OUTPUT_TABLE ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'key.raw.endianness' = 'big-endian', > 'key.fields' = 'user_id', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081', > 'value.avro-confluent.schema-registry.subject' = 'user_behavior' > ) > > {code} > > h3. writing to Kafka while registering both the key and the value to the > schema registry > {code:java} > CREATE TABLE OUTPUT_TABLE ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > -- => this will register a {user_id: long} Avro type in the schema registry. > -- Watch out: schema evolution in the context of a Kafka key is almost never > backward nor > -- forward compatible in practice due to hash partitioning. > 'key.avro-confluent.schema-registry.url' = 'http://localhost:8081', > 'key.avro-confluent.schema-registry.subject' = 'user_behavior_key', > 'key.format' = 'avro-confluent', > 'key.fields' = 'user_id', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081', > 'value.avro-confluent.schema-registry.subject' = 'user_behavior_value' > ) > > {code} > > h3. reading form Kafka with both the key and value schema in the registry > while resolving field name clashes: > {code:java} > CREATE TABLE INPUT_TABLE ( > -- user_id as read from the kafka key: > from_kafka_key_user_id BIGINT, > > -- user_id, and other fields, as read from the kafka value- > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'avro-confluent', > 'key.avro-confluent.schema-registry.url' = 'http://localhost:8081', > 'key.fields' = 'from_kafka_key_user_id', > -- Adds a column prefix when mapping the avro fields of the kafka key to > columns of this Table > -- to avoid clashes with avro fields of the value (both contain 'user_id' in > this example) > 'key.fields-prefix' = 'from_kafka_key_', > 'value.format' = 'avro-confluent', > -- cannot include key here since dealt with above > 'value.fields-include' = 'EXCEPT_KEY', > 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081' > ) > > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)