Hi guys,

Could you help me setting a Kafka sink connector using Avro format? I'm
using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
my_script.py).

My code is simple with a call to a UDTF that now only yields a fixed small
binary. I use source and sink kafka connectors. The source uses Json format
that work as expected. The sink works with the Csv format:

(t_env.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("output")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
) \
.with_format( # declare a format for this system
Csv() # Csv convert bytes to base64 string....
) \
.with_schema( # declare the schema of the table
Schema()
.field("outf", DataTypes.BYTES())
) \
.create_temporary_table("mySink"))

But the Csv format converts the bytes (varbinary) to base64 string as
expected, but is not desired.
With the Avro format, I just get errors.

- Just replacing Csv() on the code above for Avro(), and adding Avro's
depency with
conf = t_env.get_config().get_configuration()
conf.set_string("pipeline.jars",
"file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
)
I've got:
*org.apache.flink.table.api.ValidationException: A definition of an Avro
specific record class or Avro schema is required.*

- After looking in the pyFlink source code, I've also passed an avro_schema
argument to the constructor:  Avro(avro_schema=<my_schema_in_string>) and
got
*java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*

- Using the SQL (DDL) declaration documented in [1],  I've got the same
last error.

I also have some doubts about how to create the schema, but I need the Avro
to works first.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html

Thanks,
Rodrigo

Reply via email to