Thank you Xingbo.

I've managed to get it working adding the Avro jar and the three artifacts
from the *com.fasterxml.jackson.core* group [1]. Is it required to also add
the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be
required, as I won't use date types in my Avro schema.

About using Avro, I'd like to know if pyFlink supports the Avro Union Type.
I've found this old e-mail [2] that mentions that, but for java. If pyFlink
supports it, how would I declare the schema? Can I define the schema on an
external .avsc file and import it, maybe through Avro(avro_schema)?

[1] https://search.maven.org/search?q=g:com.fasterxml.jackson.core
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AVRO-Union-type-support-in-Flink-td14318.html

Thanks,
Rodrigo

On Tue, Aug 11, 2020 at 12:09 AM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Rodrigo,
>
> Flink doesn't support an avro uber jar, so you need to add all dependency
> jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and
> joda-time in release-1.11.
> However, I found that there was a JIRA[1] that provided a default version
> of avro uber jar a few days ago.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-18802
>
> Best,
> Xingbo
>
> Rodrigo de Souza Oliveira Brochado <rodrigo.broch...@predito.com.br>
> 于2020年8月11日周二 上午7:30写道:
>
>> Hi guys,
>>
>> Could you help me setting a Kafka sink connector using Avro format? I'm
>> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
>> my_script.py).
>>
>> My code is simple with a call to a UDTF that now only yields a fixed
>> small binary. I use source and sink kafka connectors. The source uses Json
>> format that work as expected. The sink works with the Csv format:
>>
>> (t_env.connect( # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("output")
>> .property("bootstrap.servers", "localhost:9092")
>> .property("zookeeper.connect", "localhost:2181")
>> ) \
>> .with_format( # declare a format for this system
>> Csv() # Csv convert bytes to base64 string....
>> ) \
>> .with_schema( # declare the schema of the table
>> Schema()
>> .field("outf", DataTypes.BYTES())
>> ) \
>> .create_temporary_table("mySink"))
>>
>> But the Csv format converts the bytes (varbinary) to base64 string as
>> expected, but is not desired.
>> With the Avro format, I just get errors.
>>
>> - Just replacing Csv() on the code above for Avro(), and adding Avro's
>> depency with
>> conf = t_env.get_config().get_configuration()
>> conf.set_string("pipeline.jars",
>> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
>> )
>> I've got:
>> *org.apache.flink.table.api.ValidationException: A definition of an Avro
>> specific record class or Avro schema is required.*
>>
>> - After looking in the pyFlink source code, I've also passed an
>> avro_schema argument to the constructor:
>> Avro(avro_schema=<my_schema_in_string>) and got
>> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*
>>
>> - Using the SQL (DDL) declaration documented in [1],  I've got the same
>> last error.
>>
>> I also have some doubts about how to create the schema, but I need the
>> Avro to works first.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>>
>> Thanks,
>> Rodrigo
>>
>>

-- 

*Rodrigo S. Oliveira Brochado**Cientista de Dados*
rodrigo.broch...@predito.com.br+55 31 99463-2014 <+553199463-2014>
www.predito.com.br <https://predito.com.br/>

Reply via email to