Hi guys, Could you help me setting a Kafka sink connector using Avro format? I'm using pyFlink 1.11, OpenJDK 11, and executing the job locally (python my_script.py).
My code is simple with a call to a UDTF that now only yields a fixed small binary. I use source and sink kafka connectors. The source uses Json format that work as expected. The sink works with the Csv format: (t_env.connect( # declare the external system to connect to Kafka() .version("universal") .topic("output") .property("bootstrap.servers", "localhost:9092") .property("zookeeper.connect", "localhost:2181") ) \ .with_format( # declare a format for this system Csv() # Csv convert bytes to base64 string.... ) \ .with_schema( # declare the schema of the table Schema() .field("outf", DataTypes.BYTES()) ) \ .create_temporary_table("mySink")) But the Csv format converts the bytes (varbinary) to base64 string as expected, but is not desired. With the Avro format, I just get errors. - Just replacing Csv() on the code above for Avro(), and adding Avro's depency with conf = t_env.get_config().get_configuration() conf.set_string("pipeline.jars", "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar"; ) I've got: *org.apache.flink.table.api.ValidationException: A definition of an Avro specific record class or Avro schema is required.* - After looking in the pyFlink source code, I've also passed an avro_schema argument to the constructor: Avro(avro_schema=<my_schema_in_string>) and got *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter* - Using the SQL (DDL) declaration documented in [1], I've got the same last error. I also have some doubts about how to create the schema, but I need the Avro to works first. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html Thanks, Rodrigo