Hi Ruurtjan,

Thanks for reporting this. There is already an issue to track to support
this feature! https://issues.apache.org/jira/browse/FLINK-16048
Currently, Flink SQL only supports JSON,CSV and standard AVRO format.
The Kafka Avro format is not supported yet, but this's definitely in our
roadmap and in a high priority.

As a workaround, you can implement a DeserializationSchemaFactory for
Confluent Avro using the existing ConfluentRegistryAvroDeserializationSchema
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema>.
And register the factory class path to the
"/resources/META-INF/services/org.apache.flink.table.factories.TableFactory"
for SPI. You can take AvroRowFormatFactory as an example.

Best,
Jark


On Fri, 3 Apr 2020 at 21:51, Ruurtjan Pul <ruurt...@gmail.com> wrote:

> Hi there,
>
> I'm not sure if this is possible, because it doesn't seem to be documented
> anywhere, so here we go...
>
> I've got a Kafka topic with Avro encoded records that have been produced
> with a Confluent schema registry Kafka client. Note that these Avro records
> are slightly different from vanilla Avro messages, as the Confluent
> producer adds (prepends?) some metadata that encodes the schema version
> that's used for encoding the record. So you can't just decode it with a
> regular Avro client, or you'll get:
> > Flink SQL> select * from SomeTopic limit 10;
> > [ERROR] Could not execute SQL statement. Reason:
> > org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -1
>
> Now, I'd like to read this topic in a sql-client.sh session, but can't
> seem to work out how to do so. There is a
> ConfluentRegistryAvroDeserializationSchema
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema>,
> for which I can only find a Flink jar
> <https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.10.0/>,
> not a Flink sql jar. There's also the
> `flink-sql-connector-kafka_2.12-1.10.0.jar` and
> `flink-avro-1.10.0-sql-jar.jar` jars, but they suffer the `Malformed data`
> issue I mentioned before.
>
> Is there any way to read from a Confluent schema registry Avro encoded
> Kafka topic in a Flink SQL session? (Try saying that three times fast ;))
>
> Best,
> Ruurtjan
>

Reply via email to