Hi Ruurtjan, Thanks for reporting this. There is already an issue to track to support this feature! https://issues.apache.org/jira/browse/FLINK-16048 Currently, Flink SQL only supports JSON,CSV and standard AVRO format. The Kafka Avro format is not supported yet, but this's definitely in our roadmap and in a high priority.
As a workaround, you can implement a DeserializationSchemaFactory for Confluent Avro using the existing ConfluentRegistryAvroDeserializationSchema <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema>. And register the factory class path to the "/resources/META-INF/services/org.apache.flink.table.factories.TableFactory" for SPI. You can take AvroRowFormatFactory as an example. Best, Jark On Fri, 3 Apr 2020 at 21:51, Ruurtjan Pul <ruurt...@gmail.com> wrote: > Hi there, > > I'm not sure if this is possible, because it doesn't seem to be documented > anywhere, so here we go... > > I've got a Kafka topic with Avro encoded records that have been produced > with a Confluent schema registry Kafka client. Note that these Avro records > are slightly different from vanilla Avro messages, as the Confluent > producer adds (prepends?) some metadata that encodes the schema version > that's used for encoding the record. So you can't just decode it with a > regular Avro client, or you'll get: > > Flink SQL> select * from SomeTopic limit 10; > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.avro.AvroRuntimeException: Malformed data. Length is > negative: -1 > > Now, I'd like to read this topic in a sql-client.sh session, but can't > seem to work out how to do so. There is a > ConfluentRegistryAvroDeserializationSchema > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema>, > for which I can only find a Flink jar > <https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.10.0/>, > not a Flink sql jar. There's also the > `flink-sql-connector-kafka_2.12-1.10.0.jar` and > `flink-avro-1.10.0-sql-jar.jar` jars, but they suffer the `Malformed data` > issue I mentioned before. > > Is there any way to read from a Confluent schema registry Avro encoded > Kafka topic in a Flink SQL session? (Try saying that three times fast ;)) > > Best, > Ruurtjan >