Thank you Xingbo. I've managed to get it working adding the Avro jar and the three artifacts from the *com.fasterxml.jackson.core* group [1]. Is it required to also add the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be required, as I won't use date types in my Avro schema.
About using Avro, I'd like to know if pyFlink supports the Avro Union Type. I've found this old e-mail [2] that mentions that, but for java. If pyFlink supports it, how would I declare the schema? Can I define the schema on an external .avsc file and import it, maybe through Avro(avro_schema)? [1] https://search.maven.org/search?q=g:com.fasterxml.jackson.core [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AVRO-Union-type-support-in-Flink-td14318.html Thanks, Rodrigo On Tue, Aug 11, 2020 at 12:09 AM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Rodrigo, > > Flink doesn't support an avro uber jar, so you need to add all dependency > jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and > joda-time in release-1.11. > However, I found that there was a JIRA[1] that provided a default version > of avro uber jar a few days ago. > > > [1] https://issues.apache.org/jira/browse/FLINK-18802 > > Best, > Xingbo > > Rodrigo de Souza Oliveira Brochado <rodrigo.broch...@predito.com.br> > 于2020年8月11日周二 上午7:30写道: > >> Hi guys, >> >> Could you help me setting a Kafka sink connector using Avro format? I'm >> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python >> my_script.py). >> >> My code is simple with a call to a UDTF that now only yields a fixed >> small binary. I use source and sink kafka connectors. The source uses Json >> format that work as expected. The sink works with the Csv format: >> >> (t_env.connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("output") >> .property("bootstrap.servers", "localhost:9092") >> .property("zookeeper.connect", "localhost:2181") >> ) \ >> .with_format( # declare a format for this system >> Csv() # Csv convert bytes to base64 string.... >> ) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("outf", DataTypes.BYTES()) >> ) \ >> .create_temporary_table("mySink")) >> >> But the Csv format converts the bytes (varbinary) to base64 string as >> expected, but is not desired. >> With the Avro format, I just get errors. >> >> - Just replacing Csv() on the code above for Avro(), and adding Avro's >> depency with >> conf = t_env.get_config().get_configuration() >> conf.set_string("pipeline.jars", >> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar"; >> ) >> I've got: >> *org.apache.flink.table.api.ValidationException: A definition of an Avro >> specific record class or Avro schema is required.* >> >> - After looking in the pyFlink source code, I've also passed an >> avro_schema argument to the constructor: >> Avro(avro_schema=<my_schema_in_string>) and got >> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter* >> >> - Using the SQL (DDL) declaration documented in [1], I've got the same >> last error. >> >> I also have some doubts about how to create the schema, but I need the >> Avro to works first. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html >> >> Thanks, >> Rodrigo >> >> -- *Rodrigo S. Oliveira Brochado**Cientista de Dados* rodrigo.broch...@predito.com.br+55 31 99463-2014 <+553199463-2014> www.predito.com.br <https://predito.com.br/>